class Sidekiq::BasicFetch

Constants

TIMEOUT

We want the fetch operation to timeout every few seconds so the thread can check if the process is shutting down.

UnitOfWork

Public Class Methods

bulk_requeue(inprogress, options) click to toggle source

By leaving this as a class method, it can be pluggable and used by the Manager actor. Making it an instance method will make it async to the Fetcher actor

# File lib/sidekiq/fetch.rb, line 57
def self.bulk_requeue(inprogress, options)
  return if inprogress.empty?

  Sidekiq.logger.debug { "Re-queueing terminated jobs" }
  jobs_to_requeue = {}
  inprogress.each do |unit_of_work|
    jobs_to_requeue[unit_of_work.queue_name] ||= []
    jobs_to_requeue[unit_of_work.queue_name] << unit_of_work.job
  end

  Sidekiq.redis do |conn|
    conn.pipelined do
      jobs_to_requeue.each do |queue, jobs|
        conn.rpush("queue:#{queue}", jobs)
      end
    end
  end
  Sidekiq.logger.info("Pushed #{inprogress.size} jobs back to Redis")
rescue => ex
  Sidekiq.logger.warn("Failed to requeue #{inprogress.size} jobs: #{ex.message}")
end
new(options) click to toggle source
# File lib/sidekiq/fetch.rb, line 25
def initialize(options)
  @strictly_ordered_queues = !!options[:strict]
  @queues = options[:queues].map { |q| "queue:#{q}" }
  if @strictly_ordered_queues
    @queues = @queues.uniq
    @queues << TIMEOUT
  end
end

Public Instance Methods

queues_cmd() click to toggle source

Creating the Redis#brpop command takes into account any configured queue weights. By default Redis#brpop returns data from the first queue that has pending elements. We recreate the queue command each time we invoke Redis#brpop to honor weights and avoid queue starvation.

# File lib/sidekiq/fetch.rb, line 44
def queues_cmd
  if @strictly_ordered_queues
    @queues
  else
    queues = @queues.shuffle.uniq
    queues << TIMEOUT
    queues
  end
end
retrieve_work() click to toggle source
# File lib/sidekiq/fetch.rb, line 34
def retrieve_work
  work = Sidekiq.redis { |conn| conn.brpop(*queues_cmd) }
  UnitOfWork.new(*work) if work
end