class Sidekiq::Launcher
The Launcher
starts the Manager
and Poller threads and provides the process heartbeat.
Constants
- BEAT_PAUSE
- MEMORY_GRABBER
- PROCTITLES
- RTT_READINGS
We run the heartbeat every five seconds. Capture five samples of RTT, log a warning if each sample is above our warning threshold.
- RTT_WARNING_LEVEL
- STATS_TTL
Attributes
fetcher[RW]
manager[RW]
poller[RW]
Public Class Methods
new(options)
click to toggle source
# File lib/sidekiq/launcher.rb, line 24 def initialize(options) options[:fetch] ||= BasicFetch.new(options) @manager = Sidekiq::Manager.new(options) @poller = Sidekiq::Scheduled::Poller.new @done = false @options = options end
Private Class Methods
flush_stats()
click to toggle source
# File lib/sidekiq/launcher.rb, line 102 def self.flush_stats fails = Processor::FAILURE.reset procd = Processor::PROCESSED.reset return if fails + procd == 0 nowdate = Time.now.utc.strftime("%Y-%m-%d") begin Sidekiq.redis do |conn| conn.pipelined do conn.incrby("stat:processed", procd) conn.incrby("stat:processed:#{nowdate}", procd) conn.expire("stat:processed:#{nowdate}", STATS_TTL) conn.incrby("stat:failed", fails) conn.incrby("stat:failed:#{nowdate}", fails) conn.expire("stat:failed:#{nowdate}", STATS_TTL) end end rescue => ex # we're exiting the process, things might be shut down so don't # try to handle the exception Sidekiq.logger.warn("Unable to flush stats: #{ex}") end end
Public Instance Methods
quiet()
click to toggle source
Stops this instance from processing any more jobs,
# File lib/sidekiq/launcher.rb, line 40 def quiet @done = true @manager.quiet @poller.terminate end
run()
click to toggle source
# File lib/sidekiq/launcher.rb, line 32 def run @thread = safe_thread("heartbeat", &method(:start_heartbeat)) @poller.start @manager.start end
stop()
click to toggle source
Shuts down the process. This method does not return until all work is complete and cleaned up. It can take up to the timeout to complete.
# File lib/sidekiq/launcher.rb, line 49 def stop deadline = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) + @options[:timeout] @done = true @manager.quiet @poller.terminate @manager.stop(deadline) # Requeue everything in case there was a worker who grabbed work while stopped # This call is a no-op in Sidekiq but necessary for Sidekiq Pro. strategy = @options[:fetch] strategy.bulk_requeue([], @options) clear_heartbeat end
stopping?()
click to toggle source
# File lib/sidekiq/launcher.rb, line 66 def stopping? @done end
Private Instance Methods
check_rtt()
click to toggle source
# File lib/sidekiq/launcher.rb, line 199 def check_rtt a = b = 0 Sidekiq.redis do |x| a = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC, :microsecond) x.ping b = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC, :microsecond) end rtt = b - a RTT_READINGS << rtt # Ideal RTT for Redis is < 1000µs # Workable is < 10,000µs # Log a warning if it's a disaster. if RTT_READINGS.all? { |x| x > RTT_WARNING_LEVEL } Sidekiq.logger.warn <<~EOM Your Redis network connection is performing extremely poorly. Last RTT readings were #{RTT_READINGS.buffer.inspect}, ideally these should be < 1000. Ensure Redis is running in the same AZ or datacenter as Sidekiq. If these values are close to 100,000, that means your Sidekiq process may be CPU overloaded; see https://github.com/mperham/sidekiq/discussions/5039 EOM RTT_READINGS.reset end rtt end
clear_heartbeat()
click to toggle source
# File lib/sidekiq/launcher.rb, line 82 def clear_heartbeat # Remove record from Redis since we are shutting down. # Note we don't stop the heartbeat thread; if the process # doesn't actually exit, it'll reappear in the Web UI. Sidekiq.redis do |conn| conn.pipelined do conn.srem("processes", identity) conn.unlink("#{identity}:workers") end end rescue # best effort, ignore network errors end
heartbeat()
click to toggle source
# File lib/sidekiq/launcher.rb, line 96 def heartbeat $0 = PROCTITLES.map { |proc| proc.call(self, to_data) }.compact.join(" ") ❤ end
memory_usage(pid)
click to toggle source
# File lib/sidekiq/launcher.rb, line 240 def memory_usage(pid) MEMORY_GRABBER.call(pid) end
start_heartbeat()
click to toggle source
# File lib/sidekiq/launcher.rb, line 74 def start_heartbeat loop do heartbeat sleep BEAT_PAUSE end Sidekiq.logger.info("Heartbeat stopping...") end
to_data()
click to toggle source
# File lib/sidekiq/launcher.rb, line 244 def to_data @data ||= { "hostname" => hostname, "started_at" => Time.now.to_f, "pid" => ::Process.pid, "tag" => @options[:tag] || "", "concurrency" => @options[:concurrency], "queues" => @options[:queues].uniq, "labels" => @options[:labels], "identity" => identity } end
to_json()
click to toggle source
# File lib/sidekiq/launcher.rb, line 257 def to_json # this data changes infrequently so dump it to a string # now so we don't need to dump it every heartbeat. @json ||= Sidekiq.dump_json(to_data) end
❤()
click to toggle source
# File lib/sidekiq/launcher.rb, line 128 def ❤ key = identity fails = procd = 0 begin fails = Processor::FAILURE.reset procd = Processor::PROCESSED.reset curstate = Processor::WORKER_STATE.dup workers_key = "#{key}:workers" nowdate = Time.now.utc.strftime("%Y-%m-%d") Sidekiq.redis do |conn| conn.multi do conn.incrby("stat:processed", procd) conn.incrby("stat:processed:#{nowdate}", procd) conn.expire("stat:processed:#{nowdate}", STATS_TTL) conn.incrby("stat:failed", fails) conn.incrby("stat:failed:#{nowdate}", fails) conn.expire("stat:failed:#{nowdate}", STATS_TTL) conn.unlink(workers_key) curstate.each_pair do |tid, hash| conn.hset(workers_key, tid, Sidekiq.dump_json(hash)) end conn.expire(workers_key, 60) end end rtt = check_rtt fails = procd = 0 kb = memory_usage(::Process.pid) _, exists, _, _, msg = Sidekiq.redis { |conn| conn.multi { conn.sadd("processes", key) conn.exists?(key) conn.hmset(key, "info", to_json, "busy", curstate.size, "beat", Time.now.to_f, "rtt_us", rtt, "quiet", @done, "rss", kb) conn.expire(key, 60) conn.rpop("#{key}-signals") } } # first heartbeat or recovering from an outage and need to reestablish our heartbeat fire_event(:heartbeat) unless exists return unless msg ::Process.kill(msg, ::Process.pid) rescue => e # ignore all redis/network issues logger.error("heartbeat: #{e}") # don't lose the counts if there was a network issue Processor::PROCESSED.incr(procd) Processor::FAILURE.incr(fails) end end