class Sidekiq::BaseReliableFetch

Constants

DEFAULT_CLEANUP_INTERVAL
DEFAULT_LEASE_INTERVAL

Defines how often we try to take a lease to not flood our Redis server with SET requests

DEFAULT_MAX_RETRIES_AFTER_INTERRUPTION

How much time a job can be interrupted

HEARTBEAT_INTERVAL
HEARTBEAT_LIFESPAN
HEARTBEAT_RETRY_DELAY
LEASE_KEY
LEGACY_WORKING_QUEUE_REGEX
SCAN_COUNT

Defines the COUNT parameter that will be passed to Redis SCAN command

UnitOfWork
WORKING_QUEUE_PREFIX
WORKING_QUEUE_REGEX

Regexes for matching working queue keys

Attributes

cleanup_interval[R]
last_try_to_take_lease_at[R]
lease_interval[R]
queues[R]
strictly_ordered_queues[R]
use_semi_reliable_fetch[R]

Public Class Methods

heartbeat() click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 91
def self.heartbeat
  Sidekiq.redis do |conn|
    conn.set(heartbeat_key(identity), 1, ex: HEARTBEAT_LIFESPAN)
  end

  Sidekiq.logger.debug("Heartbeat for #{identity}")
end
heartbeat_key(identity) click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 103
def self.heartbeat_key(identity)
  "reliable-fetcher-heartbeat-#{identity.gsub(':', '-')}"
end
hostname() click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 79
def self.hostname
  Socket.gethostname
end
identity() click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 87
def self.identity
  @@identity ||= "#{hostname}:#{$$}:#{process_nonce}"
end
new(options) click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 115
def initialize(options)
  raise ArgumentError, 'missing queue list' unless options[:queues]

  @config = options
  @interrupted_set = Sidekiq::InterruptedSet.new
  @cleanup_interval = options.fetch(:cleanup_interval, DEFAULT_CLEANUP_INTERVAL)
  @lease_interval = options.fetch(:lease_interval, DEFAULT_LEASE_INTERVAL)
  @last_try_to_take_lease_at = 0
  @strictly_ordered_queues = !!options[:strict]
  @queues = options[:queues].map { |q| "queue:#{q}" }
end
process_nonce() click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 83
def self.process_nonce
  @@process_nonce ||= SecureRandom.hex(6)
end
setup_reliable_fetch!(config) click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 47
def self.setup_reliable_fetch!(config)
  config = config.options unless config.respond_to?(:[])

  fetch_strategy = if config[:semi_reliable_fetch]
                     Sidekiq::SemiReliableFetch
                   else
                     Sidekiq::ReliableFetch
                   end

  config[:fetch] = fetch_strategy.new(config)

  Sidekiq.logger.info('GitLab reliable fetch activated!')

  start_heartbeat_thread
end
start_heartbeat_thread() click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 63
def self.start_heartbeat_thread
  Thread.new do
    loop do
      begin
        heartbeat

        sleep HEARTBEAT_INTERVAL
      rescue => e
        Sidekiq.logger.error("Heartbeat thread error: #{e.message}")

        sleep HEARTBEAT_RETRY_DELAY
      end
    end
  end
end
worker_dead?(identity, conn) click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 99
def self.worker_dead?(identity, conn)
  !conn.get(heartbeat_key(identity))
end
working_queue_name(queue) click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 107
def self.working_queue_name(queue)
  "#{WORKING_QUEUE_PREFIX}:#{queue}:#{identity}"
end

Public Instance Methods

bulk_requeue(inprogress, _options) click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 138
def bulk_requeue(inprogress, _options)
  return if inprogress.empty?

  Sidekiq.redis do |conn|
    inprogress.each do |unit_of_work|
      conn.multi do |multi|
        preprocess_interrupted_job(unit_of_work.job, unit_of_work.queue, multi)

        multi.lrem(self.class.working_queue_name(unit_of_work.queue), 1, unit_of_work.job)
      end
    end
  end
rescue => e
  Sidekiq.logger.warn("Failed to requeue #{inprogress.size} jobs: #{e.message}")
end
retrieve_unit_of_work() click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 133
def retrieve_unit_of_work
  raise NotImplementedError,
        "#{self.class} does not implement #{__method__}"
end
retrieve_work() click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 127
def retrieve_work
  clean_working_queues! if take_lease

  retrieve_unit_of_work
end

Private Instance Methods

allowed_to_take_a_lease?() click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 265
def allowed_to_take_a_lease?
  Time.now.to_f - last_try_to_take_lease_at > lease_interval
end
clean_working_queue!(original_queue, working_queue) click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 210
def clean_working_queue!(original_queue, working_queue)
  Sidekiq.redis do |conn|
    while job = conn.rpop(working_queue)
      preprocess_interrupted_job(job, original_queue)
    end
  end
end
clean_working_queues!() click to toggle source

Detect “old” jobs and requeue them because the worker they were assigned to probably failed miserably.

# File lib/sidekiq/base_reliable_fetch.rb, line 196
def clean_working_queues!
  Sidekiq.logger.info('Cleaning working queues')

  Sidekiq.redis do |conn|
    conn.scan_each(match: "#{WORKING_QUEUE_PREFIX}:queue:*", count: SCAN_COUNT) do |key|
      original_queue, identity = extract_queue_and_identity(key)

      next if original_queue.nil? || identity.nil?

      clean_working_queue!(original_queue, key) if self.class.worker_dead?(identity, conn)
    end
  end
end
extract_queue_and_identity(key) click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 181
def extract_queue_and_identity(key)
  # New identity format is "{hostname}:{pid}:{randomhex}
  # Old identity format is "{hostname}:{pid}"
  # Queue names may also have colons (namespaced).
  # Expressing this in a single regex is unreadable

  # Test the newer expected format first, only checking the older if necessary
  original_queue, identity = key.scan(WORKING_QUEUE_REGEX).flatten
  return original_queue, identity unless original_queue.nil? || identity.nil?

  key.scan(LEGACY_WORKING_QUEUE_REGEX).flatten
end
interruption_exhausted?(msg) click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 218
def interruption_exhausted?(msg)
  return false if max_retries_after_interruption(msg['class']) < 0

  msg['interrupted_count'].to_i >= max_retries_after_interruption(msg['class'])
end
max_retries_after_interruption(worker_class) click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 224
def max_retries_after_interruption(worker_class)
  max_retries_after_interruption = nil

  max_retries_after_interruption ||= begin
    Object.const_get(worker_class).sidekiq_options[:max_retries_after_interruption]
  rescue NameError
  end

  max_retries_after_interruption ||= @config[:max_retries_after_interruption]
  max_retries_after_interruption ||= DEFAULT_MAX_RETRIES_AFTER_INTERRUPTION
  max_retries_after_interruption
end
preprocess_interrupted_job(job, queue, conn = nil) click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 156
def preprocess_interrupted_job(job, queue, conn = nil)
  msg = Sidekiq.load_json(job)
  msg['interrupted_count'] = msg['interrupted_count'].to_i + 1

  if interruption_exhausted?(msg)
    send_to_quarantine(msg, conn)
  else
    requeue_job(queue, msg, conn)
  end
end
requeue_job(queue, msg, conn) click to toggle source

If you want this method to be run in a scope of multi connection you need to pass it

# File lib/sidekiq/base_reliable_fetch.rb, line 169
def requeue_job(queue, msg, conn)
  with_connection(conn) do |conn|
    conn.lpush(queue, Sidekiq.dump_json(msg))
  end

  Sidekiq.logger.info(
    message: "Pushed job #{msg['jid']} back to queue #{queue}",
    jid: msg['jid'],
    queue: queue
  )
end
send_to_quarantine(msg, multi_connection = nil) click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 237
def send_to_quarantine(msg, multi_connection = nil)
  Sidekiq.logger.warn(
    class: msg['class'],
    jid: msg['jid'],
    message: %(Reliable Fetcher: adding dead #{msg['class']} job #{msg['jid']} to interrupted queue)
  )

  job = Sidekiq.dump_json(msg)
  @interrupted_set.put(job, connection: multi_connection)
end
take_lease() click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 255
def take_lease
  return unless allowed_to_take_a_lease?

  @last_try_to_take_lease_at = Time.now.to_f

  Sidekiq.redis do |conn|
    conn.set(LEASE_KEY, 1, nx: true, ex: cleanup_interval)
  end
end
with_connection(conn) { |conn| ... } click to toggle source

Yield block with an existing connection or creates another one

# File lib/sidekiq/base_reliable_fetch.rb, line 249
def with_connection(conn)
  return yield(conn) if conn

  Sidekiq.redis { |redis_conn| yield(redis_conn) }
end