class Sidekiq::BaseReliableFetch
Constants
- DEFAULT_CLEANUP_INTERVAL
- DEFAULT_LEASE_INTERVAL
Defines how often we try to take a lease to not flood our Redis server with SET requests
- DEFAULT_MAX_RETRIES_AFTER_INTERRUPTION
How much time a job can be interrupted
- HEARTBEAT_INTERVAL
- HEARTBEAT_LIFESPAN
- HEARTBEAT_RETRY_DELAY
- LEASE_KEY
- SCAN_COUNT
Defines the COUNT parameter that will be passed to Redis SCAN command
- UnitOfWork
- WORKING_QUEUE_PREFIX
Attributes
cleanup_interval[R]
last_try_to_take_lease_at[R]
lease_interval[R]
queues[R]
strictly_ordered_queues[R]
use_semi_reliable_fetch[R]
Public Class Methods
bulk_requeue(inprogress, _options)
click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 86 def self.bulk_requeue(inprogress, _options) return if inprogress.empty? Sidekiq.redis do |conn| inprogress.each do |unit_of_work| conn.multi do |multi| preprocess_interrupted_job(unit_of_work.job, unit_of_work.queue, multi) multi.lrem(working_queue_name(unit_of_work.queue), 1, unit_of_work.job) end end end rescue => e Sidekiq.logger.warn("Failed to requeue #{inprogress.size} jobs: #{e.message}") end
clean_working_queue!(working_queue)
click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 102 def self.clean_working_queue!(working_queue) original_queue = working_queue.gsub(/#{WORKING_QUEUE_PREFIX}:|:[^:]*:[0-9]*\z/, '') Sidekiq.redis do |conn| while job = conn.rpop(working_queue) preprocess_interrupted_job(job, original_queue) end end end
clean_working_queues!()
click to toggle source
Detect “old” jobs and requeue them because the worker they were assigned to probably failed miserably.
# File lib/sidekiq/base_reliable_fetch.rb, line 125 def self.clean_working_queues! Sidekiq.logger.info('Cleaning working queues') Sidekiq.redis do |conn| conn.scan_each(match: "#{WORKING_QUEUE_PREFIX}:queue:*", count: SCAN_COUNT) do |key| # Example: "working:name_of_the_job:queue:{hostname}:{PID}" hostname, pid = key.scan(/:([^:]*):([0-9]*)\z/).flatten continue if hostname.nil? || pid.nil? clean_working_queue!(key) if worker_dead?(hostname, pid, conn) end end end
heartbeat()
click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 78 def self.heartbeat Sidekiq.redis do |conn| conn.set(heartbeat_key(hostname, pid), 1, ex: HEARTBEAT_LIFESPAN) end Sidekiq.logger.debug("Heartbeat for hostname: #{hostname} and pid: #{pid}") end
heartbeat_key(hostname, pid)
click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 144 def self.heartbeat_key(hostname, pid) "reliable-fetcher-heartbeat-#{hostname}-#{pid}" end
hostname()
click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 74 def self.hostname @hostname ||= Socket.gethostname end
interruption_exhausted?(msg)
click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 152 def self.interruption_exhausted?(msg) return false if max_retries_after_interruption(msg['class']) < 0 msg['interrupted_count'].to_i >= max_retries_after_interruption(msg['class']) end
max_retries_after_interruption(worker_class)
click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 158 def self.max_retries_after_interruption(worker_class) max_retries_after_interruption = nil max_retries_after_interruption ||= begin Object.const_get(worker_class).sidekiq_options[:max_retries_after_interruption] rescue NameError end max_retries_after_interruption ||= Sidekiq.options[:max_retries_after_interruption] max_retries_after_interruption ||= DEFAULT_MAX_RETRIES_AFTER_INTERRUPTION max_retries_after_interruption end
new(options)
click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 207 def initialize(options) @cleanup_interval = options.fetch(:cleanup_interval, DEFAULT_CLEANUP_INTERVAL) @lease_interval = options.fetch(:lease_interval, DEFAULT_LEASE_INTERVAL) @last_try_to_take_lease_at = 0 @strictly_ordered_queues = !!options[:strict] @queues = options[:queues].map { |q| "queue:#{q}" } end
pid()
click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 70 def self.pid @pid ||= ::Process.pid end
preprocess_interrupted_job(job, queue, conn = nil)
click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 112 def self.preprocess_interrupted_job(job, queue, conn = nil) msg = Sidekiq.load_json(job) msg['interrupted_count'] = msg['interrupted_count'].to_i + 1 if interruption_exhausted?(msg) send_to_quarantine(msg, conn) else requeue_job(queue, msg, conn) end end
requeue_job(queue, msg, conn)
click to toggle source
If you want this method to be run is a scope of multi connection you need to pass it
# File lib/sidekiq/base_reliable_fetch.rb, line 184 def self.requeue_job(queue, msg, conn) with_connection(conn) do |conn| conn.lpush(queue, Sidekiq.dump_json(msg)) end Sidekiq.logger.info( message: "Pushed job #{msg['jid']} back to queue #{queue}", jid: msg['jid'], queue: queue ) end
send_to_quarantine(msg, multi_connection = nil)
click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 171 def self.send_to_quarantine(msg, multi_connection = nil) Sidekiq.logger.warn( class: msg['class'], jid: msg['jid'], message: %(Reliable Fetcher: adding dead #{msg['class']} job #{msg['jid']} to interrupted queue) ) job = Sidekiq.dump_json(msg) Sidekiq::InterruptedSet.new.put(job, connection: multi_connection) end
setup_reliable_fetch!(config)
click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 42 def self.setup_reliable_fetch!(config) config.options[:fetch] = if config.options[:semi_reliable_fetch] Sidekiq::SemiReliableFetch else Sidekiq::ReliableFetch end Sidekiq.logger.info('GitLab reliable fetch activated!') start_heartbeat_thread end
start_heartbeat_thread()
click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 54 def self.start_heartbeat_thread Thread.new do loop do begin heartbeat sleep HEARTBEAT_INTERVAL rescue => e Sidekiq.logger.error("Heartbeat thread error: #{e.message}") sleep HEARTBEAT_RETRY_DELAY end end end end
with_connection(conn) { |conn| ... }
click to toggle source
Yield block with an existing connection or creates another one
# File lib/sidekiq/base_reliable_fetch.rb, line 197 def self.with_connection(conn, &block) return yield(conn) if conn Sidekiq.redis { |conn| yield(conn) } end
worker_dead?(hostname, pid, conn)
click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 140 def self.worker_dead?(hostname, pid, conn) !conn.get(heartbeat_key(hostname, pid)) end
working_queue_name(queue)
click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 148 def self.working_queue_name(queue) "#{WORKING_QUEUE_PREFIX}:#{queue}:#{hostname}:#{pid}" end
Public Instance Methods
retrieve_unit_of_work()
click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 221 def retrieve_unit_of_work raise NotImplementedError, "#{self.class} does not implement #{__method__}" end
retrieve_work()
click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 215 def retrieve_work self.class.clean_working_queues! if take_lease retrieve_unit_of_work end
Private Instance Methods
allowed_to_take_a_lease?()
click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 238 def allowed_to_take_a_lease? Time.now.to_f - last_try_to_take_lease_at > lease_interval end
take_lease()
click to toggle source
# File lib/sidekiq/base_reliable_fetch.rb, line 228 def take_lease return unless allowed_to_take_a_lease? @last_try_to_take_lease_at = Time.now.to_f Sidekiq.redis do |conn| conn.set(LEASE_KEY, 1, nx: true, ex: cleanup_interval) end end