class Sidekiq::Launcher

The Launcher is a very simple Actor whose job is to start, monitor and stop the core Actors in Sidekiq. If any of these actors die, the Sidekiq process exits immediately.

Constants

STATS_TTL

Attributes

fetcher[RW]
manager[RW]
poller[RW]

Public Class Methods

new(options) click to toggle source
# File lib/sidekiq/launcher.rb, line 18
def initialize(options)
  @manager = Sidekiq::Manager.new(options)
  @poller = Sidekiq::Scheduled::Poller.new
  @done = false
  @options = options
end

Public Instance Methods

quiet() click to toggle source

Stops this instance from processing any more jobs,

# File lib/sidekiq/launcher.rb, line 33
def quiet
  @done = true
  @manager.quiet
  @poller.terminate
end
run() click to toggle source
# File lib/sidekiq/launcher.rb, line 25
def run
  @thread = safe_thread("heartbeat", &method(:start_heartbeat))
  @poller.start
  @manager.start
end
stop() click to toggle source

Shuts down the process. This method does not return until all work is complete and cleaned up. It can take up to the timeout to complete.

# File lib/sidekiq/launcher.rb, line 42
def stop
  deadline = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) + @options[:timeout]

  @done = true
  @manager.quiet
  @poller.terminate

  @manager.stop(deadline)

  # Requeue everything in case there was a worker who grabbed work while stopped
  # This call is a no-op in Sidekiq but necessary for Sidekiq Pro.
  strategy = (@options[:fetch] || Sidekiq::BasicFetch)
  strategy.bulk_requeue([], @options)

  clear_heartbeat
end
stopping?() click to toggle source
# File lib/sidekiq/launcher.rb, line 59
def stopping?
  @done
end

Private Instance Methods

clear_heartbeat() click to toggle source
# File lib/sidekiq/launcher.rb, line 158
def clear_heartbeat
  # Remove record from Redis since we are shutting down.
  # Note we don't stop the heartbeat thread; if the process
  # doesn't actually exit, it'll reappear in the Web UI.
  Sidekiq.redis do |conn|
    conn.pipelined do
      conn.srem('processes', identity)
      conn.del("#{identity}:workers")
    end
  end
rescue
  # best effort, ignore network errors
end
heartbeat() click to toggle source
# File lib/sidekiq/launcher.rb, line 65
def heartbeat
  results = Sidekiq::CLI::PROCTITLES.map {|x| x.(self, to_data) }
  results.compact!
  $0 = results.join(' ')

  
end
start_heartbeat() click to toggle source
# File lib/sidekiq/launcher.rb, line 127
def start_heartbeat
  while true
    heartbeat
    sleep 5
  end
  Sidekiq.logger.info("Heartbeat stopping...")
end
to_data() click to toggle source
# File lib/sidekiq/launcher.rb, line 135
def to_data
  @data ||= begin
    {
      'hostname' => hostname,
      'started_at' => Time.now.to_f,
      'pid' => $$,
      'tag' => @options[:tag] || '',
      'concurrency' => @options[:concurrency],
      'queues' => @options[:queues].uniq,
      'labels' => @options[:labels],
      'identity' => identity,
    }
  end
end
to_json() click to toggle source
# File lib/sidekiq/launcher.rb, line 150
def to_json
  @json ||= begin
    # this data changes infrequently so dump it to a string
    # now so we don't need to dump it every heartbeat.
    Sidekiq.dump_json(to_data)
  end
end
() click to toggle source
# File lib/sidekiq/launcher.rb, line 73
def 
  key = identity
  fails = procd = 0
  begin
    fails = Processor::FAILURE.reset
    procd = Processor::PROCESSED.reset
    curstate = Processor::WORKER_STATE.dup

    workers_key = "#{key}:workers"
    nowdate = Time.now.utc.strftime("%Y-%m-%d")
    Sidekiq.redis do |conn|
      conn.multi do
        conn.incrby("stat:processed", procd)
        conn.incrby("stat:processed:#{nowdate}", procd)
        conn.expire("stat:processed:#{nowdate}", STATS_TTL)

        conn.incrby("stat:failed", fails)
        conn.incrby("stat:failed:#{nowdate}", fails)
        conn.expire("stat:failed:#{nowdate}", STATS_TTL)

        conn.del(workers_key)
        curstate.each_pair do |tid, hash|
          conn.hset(workers_key, tid, Sidekiq.dump_json(hash))
        end
        conn.expire(workers_key, 60)
      end
    end
    fails = procd = 0

    _, exists, _, _, msg = Sidekiq.redis do |conn|
      conn.multi do
        conn.sadd('processes', key)
        conn.exists(key)
        conn.hmset(key, 'info', to_json, 'busy', curstate.size, 'beat', Time.now.to_f, 'quiet', @done)
        conn.expire(key, 60)
        conn.rpop("#{key}-signals")
      end
    end

    # first heartbeat or recovering from an outage and need to reestablish our heartbeat
    fire_event(:heartbeat) if !exists

    return unless msg

    ::Process.kill(msg, $$)
  rescue => e
    # ignore all redis/network issues
    logger.error("heartbeat: #{e.message}")
    # don't lose the counts if there was a network issue
    Processor::PROCESSED.incr(procd)
    Processor::FAILURE.incr(fails)
  end
end