class Sidekiq::Launcher

The Launcher starts the Manager and Poller threads and provides the process heartbeat.

Constants

BEAT_PAUSE
MEMORY_GRABBER
PROCTITLES
RTT_READINGS

We run the heartbeat every five seconds. Capture five samples of RTT, log a warning if each sample is above our warning threshold.

RTT_WARNING_LEVEL
STATS_TTL

Attributes

fetcher[RW]
manager[RW]
poller[RW]

Public Class Methods

new(options) click to toggle source
# File lib/sidekiq/launcher.rb, line 24
def initialize(options)
  options[:fetch] ||= BasicFetch.new(options)
  @manager = Sidekiq::Manager.new(options)
  @poller = Sidekiq::Scheduled::Poller.new
  @done = false
  @options = options
end

Private Class Methods

flush_stats() click to toggle source
# File lib/sidekiq/launcher.rb, line 102
def self.flush_stats
  fails = Processor::FAILURE.reset
  procd = Processor::PROCESSED.reset
  return if fails + procd == 0

  nowdate = Time.now.utc.strftime("%Y-%m-%d")
  begin
    Sidekiq.redis do |conn|
      conn.pipelined do
        conn.incrby("stat:processed", procd)
        conn.incrby("stat:processed:#{nowdate}", procd)
        conn.expire("stat:processed:#{nowdate}", STATS_TTL)

        conn.incrby("stat:failed", fails)
        conn.incrby("stat:failed:#{nowdate}", fails)
        conn.expire("stat:failed:#{nowdate}", STATS_TTL)
      end
    end
  rescue => ex
    # we're exiting the process, things might be shut down so don't
    # try to handle the exception
    Sidekiq.logger.warn("Unable to flush stats: #{ex}")
  end
end

Public Instance Methods

quiet() click to toggle source

Stops this instance from processing any more jobs,

# File lib/sidekiq/launcher.rb, line 40
def quiet
  @done = true
  @manager.quiet
  @poller.terminate
end
run() click to toggle source
# File lib/sidekiq/launcher.rb, line 32
def run
  @thread = safe_thread("heartbeat", &method(:start_heartbeat))
  @poller.start
  @manager.start
end
stop() click to toggle source

Shuts down the process. This method does not return until all work is complete and cleaned up. It can take up to the timeout to complete.

# File lib/sidekiq/launcher.rb, line 49
def stop
  deadline = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) + @options[:timeout]

  @done = true
  @manager.quiet
  @poller.terminate

  @manager.stop(deadline)

  # Requeue everything in case there was a worker who grabbed work while stopped
  # This call is a no-op in Sidekiq but necessary for Sidekiq Pro.
  strategy = @options[:fetch]
  strategy.bulk_requeue([], @options)

  clear_heartbeat
end
stopping?() click to toggle source
# File lib/sidekiq/launcher.rb, line 66
def stopping?
  @done
end

Private Instance Methods

check_rtt() click to toggle source
# File lib/sidekiq/launcher.rb, line 199
    def check_rtt
      a = b = 0
      Sidekiq.redis do |x|
        a = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC, :microsecond)
        x.ping
        b = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC, :microsecond)
      end
      rtt = b - a
      RTT_READINGS << rtt
      # Ideal RTT for Redis is < 1000µs
      # Workable is < 10,000µs
      # Log a warning if it's a disaster.
      if RTT_READINGS.all? { |x| x > RTT_WARNING_LEVEL }
        Sidekiq.logger.warn <<~EOM
          Your Redis network connection is performing extremely poorly.
          Last RTT readings were #{RTT_READINGS.buffer.inspect}, ideally these should be < 1000.
          Ensure Redis is running in the same AZ or datacenter as Sidekiq.
          If these values are close to 100,000, that means your Sidekiq process may be
          CPU overloaded; see https://github.com/mperham/sidekiq/discussions/5039
        EOM
        RTT_READINGS.reset
      end
      rtt
    end
clear_heartbeat() click to toggle source
# File lib/sidekiq/launcher.rb, line 82
def clear_heartbeat
  # Remove record from Redis since we are shutting down.
  # Note we don't stop the heartbeat thread; if the process
  # doesn't actually exit, it'll reappear in the Web UI.
  Sidekiq.redis do |conn|
    conn.pipelined do
      conn.srem("processes", identity)
      conn.unlink("#{identity}:workers")
    end
  end
rescue
  # best effort, ignore network errors
end
heartbeat() click to toggle source
# File lib/sidekiq/launcher.rb, line 96
def heartbeat
  $0 = PROCTITLES.map { |proc| proc.call(self, to_data) }.compact.join(" ")

  
end
memory_usage(pid) click to toggle source
# File lib/sidekiq/launcher.rb, line 240
def memory_usage(pid)
  MEMORY_GRABBER.call(pid)
end
start_heartbeat() click to toggle source
# File lib/sidekiq/launcher.rb, line 74
def start_heartbeat
  loop do
    heartbeat
    sleep BEAT_PAUSE
  end
  Sidekiq.logger.info("Heartbeat stopping...")
end
to_data() click to toggle source
# File lib/sidekiq/launcher.rb, line 244
def to_data
  @data ||= {
    "hostname" => hostname,
    "started_at" => Time.now.to_f,
    "pid" => ::Process.pid,
    "tag" => @options[:tag] || "",
    "concurrency" => @options[:concurrency],
    "queues" => @options[:queues].uniq,
    "labels" => @options[:labels],
    "identity" => identity
  }
end
to_json() click to toggle source
# File lib/sidekiq/launcher.rb, line 257
def to_json
  # this data changes infrequently so dump it to a string
  # now so we don't need to dump it every heartbeat.
  @json ||= Sidekiq.dump_json(to_data)
end
() click to toggle source
# File lib/sidekiq/launcher.rb, line 128
def 
  key = identity
  fails = procd = 0

  begin
    fails = Processor::FAILURE.reset
    procd = Processor::PROCESSED.reset
    curstate = Processor::WORKER_STATE.dup

    workers_key = "#{key}:workers"
    nowdate = Time.now.utc.strftime("%Y-%m-%d")

    Sidekiq.redis do |conn|
      conn.multi do
        conn.incrby("stat:processed", procd)
        conn.incrby("stat:processed:#{nowdate}", procd)
        conn.expire("stat:processed:#{nowdate}", STATS_TTL)

        conn.incrby("stat:failed", fails)
        conn.incrby("stat:failed:#{nowdate}", fails)
        conn.expire("stat:failed:#{nowdate}", STATS_TTL)

        conn.unlink(workers_key)
        curstate.each_pair do |tid, hash|
          conn.hset(workers_key, tid, Sidekiq.dump_json(hash))
        end
        conn.expire(workers_key, 60)
      end
    end

    rtt = check_rtt

    fails = procd = 0
    kb = memory_usage(::Process.pid)

    _, exists, _, _, msg = Sidekiq.redis { |conn|
      conn.multi {
        conn.sadd("processes", key)
        conn.exists?(key)
        conn.hmset(key, "info", to_json,
          "busy", curstate.size,
          "beat", Time.now.to_f,
          "rtt_us", rtt,
          "quiet", @done,
          "rss", kb)
        conn.expire(key, 60)
        conn.rpop("#{key}-signals")
      }
    }

    # first heartbeat or recovering from an outage and need to reestablish our heartbeat
    fire_event(:heartbeat) unless exists

    return unless msg

    ::Process.kill(msg, ::Process.pid)
  rescue => e
    # ignore all redis/network issues
    logger.error("heartbeat: #{e}")
    # don't lose the counts if there was a network issue
    Processor::PROCESSED.incr(procd)
    Processor::FAILURE.incr(fails)
  end
end