class Sidekiq::BaseReliableFetch

Constants

DEFAULT_CLEANUP_INTERVAL
DEFAULT_LEASE_INTERVAL

Defines how often we try to take a lease to not flood our Redis server with SET requests

DEFAULT_MAX_RETRIES_AFTER_INTERRUPTION

How much time a job can be interrupted

HEARTBEAT_INTERVAL
HEARTBEAT_LIFESPAN
HEARTBEAT_RETRY_DELAY
LEASE_KEY
SCAN_COUNT

Defines the COUNT parameter that will be passed to Redis SCAN command

UnitOfWork
WORKING_QUEUE_PREFIX

Attributes

cleanup_interval[R]
last_try_to_take_lease_at[R]
lease_interval[R]
queues[R]
strictly_ordered_queues[R]
use_semi_reliable_fetch[R]

Public Class Methods

bulk_requeue(inprogress, _options) click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 88
def self.bulk_requeue(inprogress, _options)
  return if inprogress.empty?

  Sidekiq.redis do |conn|
    inprogress.each do |unit_of_work|
      conn.multi do |multi|
        preprocess_interrupted_job(unit_of_work.job, unit_of_work.queue, multi)

        multi.lrem(working_queue_name(unit_of_work.queue), 1, unit_of_work.job)
      end
    end
  end
rescue => e
  Sidekiq.logger.warn("Failed to requeue #{inprogress.size} jobs: #{e.message}")
end
clean_working_queue!(working_queue) click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 104
def self.clean_working_queue!(working_queue)
  original_queue = working_queue.gsub(/#{WORKING_QUEUE_PREFIX}:|:[^:]*:[0-9]*\z/, '')

  Sidekiq.redis do |conn|
    while job = conn.rpop(working_queue)
      preprocess_interrupted_job(job, original_queue)
    end
  end
end
clean_working_queues!() click to toggle source

Detect “old” jobs and requeue them because the worker they were assigned to probably failed miserably.

# File lib/sidekiq/base_reliable_fetch.rb, line 127
def self.clean_working_queues!
  Sidekiq.logger.info('Cleaning working queues')

  Sidekiq.redis do |conn|
    conn.scan_each(match: "#{WORKING_QUEUE_PREFIX}:queue:*", count: SCAN_COUNT) do |key|
      # Example: "working:name_of_the_job:queue:{hostname}:{PID}"
      hostname, pid = key.scan(/:([^:]*):([0-9]*)\z/).flatten

      continue if hostname.nil? || pid.nil?

      clean_working_queue!(key) if worker_dead?(hostname, pid, conn)
    end
  end
end
heartbeat() click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 76
def self.heartbeat
  Sidekiq.redis do |conn|
    conn.set(heartbeat_key(hostname, pid), 1, ex: HEARTBEAT_LIFESPAN)
  end

  Sidekiq.logger.debug("Heartbeat for hostname: #{hostname} and pid: #{pid}")
end
heartbeat_key(hostname, pid) click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 146
def self.heartbeat_key(hostname, pid)
  "reliable-fetcher-heartbeat-#{hostname}-#{pid}"
end
hostname() click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 72
def self.hostname
  @hostname ||= Socket.gethostname
end
interruption_exhausted?(msg) click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 154
def self.interruption_exhausted?(msg)
  return false if max_retries_after_interruption(msg['class']) < 0

  msg['interrupted_count'].to_i >= max_retries_after_interruption(msg['class'])
end
max_retries_after_interruption(worker_class) click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 160
def self.max_retries_after_interruption(worker_class)
  max_retries_after_interruption = nil

  max_retries_after_interruption ||= begin
    Object.const_get(worker_class).sidekiq_options[:max_retries_after_interruption]
  rescue NameError
  end

  max_retries_after_interruption ||= Sidekiq.options[:max_retries_after_interruption]
  max_retries_after_interruption ||= DEFAULT_MAX_RETRIES_AFTER_INTERRUPTION
  max_retries_after_interruption
end
new(options) click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 209
def initialize(options)
  @cleanup_interval = options.fetch(:cleanup_interval, DEFAULT_CLEANUP_INTERVAL)
  @lease_interval = options.fetch(:lease_interval, DEFAULT_LEASE_INTERVAL)
  @last_try_to_take_lease_at = 0
  @strictly_ordered_queues = !!options[:strict]
  @queues = options[:queues].map { |q| "queue:#{q}" }
end
pid() click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 68
def self.pid
  @pid ||= ::Process.pid
end
preprocess_interrupted_job(job, queue, conn = nil) click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 114
def self.preprocess_interrupted_job(job, queue, conn = nil)
  msg = Sidekiq.load_json(job)
  msg['interrupted_count'] = msg['interrupted_count'].to_i + 1

  if interruption_exhausted?(msg)
    send_to_quarantine(msg, conn)
  else
    requeue_job(queue, msg, conn)
  end
end
requeue_job(queue, msg, conn) click to toggle source

If you want this method to be run is a scope of multi connection you need to pass it

# File lib/sidekiq/base_reliable_fetch.rb, line 186
def self.requeue_job(queue, msg, conn)
  with_connection(conn) do |conn|
    conn.lpush(queue, Sidekiq.dump_json(msg))
  end

  Sidekiq.logger.info(
    message: "Pushed job #{msg['jid']} back to queue #{queue}",
    jid: msg['jid'],
    queue: queue
  )
end
send_to_quarantine(msg, multi_connection = nil) click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 173
def self.send_to_quarantine(msg, multi_connection = nil)
  Sidekiq.logger.warn(
    class: msg['class'],
    jid: msg['jid'],
    message: %(Reliable Fetcher: adding dead #{msg['class']} job #{msg['jid']} to interrupted queue)
  )

  job = Sidekiq.dump_json(msg)
  Sidekiq::InterruptedSet.new.put(job, connection: multi_connection)
end
setup_reliable_fetch!(config) click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 42
def self.setup_reliable_fetch!(config)
  fetch = config.options[:semi_reliable_fetch] ? SemiReliableFetch : ReliableFetch
  fetch = fetch.new(config.options) if Sidekiq::VERSION >= '6'

  config.options[:fetch] = fetch
  Sidekiq.logger.info('GitLab reliable fetch activated!')

  start_heartbeat_thread
end
start_heartbeat_thread() click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 52
def self.start_heartbeat_thread
  Thread.new do
    loop do
      begin
        heartbeat

        sleep HEARTBEAT_INTERVAL
      rescue => e
        Sidekiq.logger.error("Heartbeat thread error: #{e.message}")

        sleep HEARTBEAT_RETRY_DELAY
      end
    end
  end
end
with_connection(conn) { |conn| ... } click to toggle source

Yield block with an existing connection or creates another one

# File lib/sidekiq/base_reliable_fetch.rb, line 199
def self.with_connection(conn, &block)
  return yield(conn) if conn

  Sidekiq.redis { |conn| yield(conn) }
end
worker_dead?(hostname, pid, conn) click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 142
def self.worker_dead?(hostname, pid, conn)
  !conn.get(heartbeat_key(hostname, pid))
end
working_queue_name(queue) click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 150
def self.working_queue_name(queue)
  "#{WORKING_QUEUE_PREFIX}:#{queue}:#{hostname}:#{pid}"
end

Public Instance Methods

bulk_requeue(inprogress, options) click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 84
def bulk_requeue(inprogress, options)
  self.class.bulk_requeue(inprogress, options)
end
retrieve_unit_of_work() click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 223
def retrieve_unit_of_work
  raise NotImplementedError,
    "#{self.class} does not implement #{__method__}"
end
retrieve_work() click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 217
def retrieve_work
  self.class.clean_working_queues! if take_lease

  retrieve_unit_of_work
end

Private Instance Methods

allowed_to_take_a_lease?() click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 240
def allowed_to_take_a_lease?
  Time.now.to_f - last_try_to_take_lease_at > lease_interval
end
take_lease() click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 230
def take_lease
  return unless allowed_to_take_a_lease?

  @last_try_to_take_lease_at = Time.now.to_f

  Sidekiq.redis do |conn|
    conn.set(LEASE_KEY, 1, nx: true, ex: cleanup_interval)
  end
end