class Sidekiq::Processor
The Processor is a standalone thread which:
-
fetches a job from Redis
-
executes the job
a. instantiate the Worker b. run the middleware chain c. call #perform
A Processor can exit due to shutdown (processor_stopped) or due to an error during job execution (processor_died)
If an error occurs in the job execution, the Processor calls the Manager to create a new one to replace itself and exits.
Constants
- FAILURE
- PROCESSED
- WORKER_STATE
Attributes
job[R]
thread[R]
Public Class Methods
new(mgr)
click to toggle source
# File lib/sidekiq/processor.rb, line 31 def initialize(mgr) @mgr = mgr @down = false @done = false @job = nil @thread = nil @strategy = (mgr.options[:fetch] || Sidekiq::BasicFetch).new(mgr.options) @reloader = Sidekiq.options[:reloader] @logging = (mgr.options[:job_logger] || Sidekiq::JobLogger).new @retrier = Sidekiq::JobRetry.new end
Public Instance Methods
kill(wait=false)
click to toggle source
# File lib/sidekiq/processor.rb, line 49 def kill(wait=false) @done = true return if !@thread # unlike the other actors, terminate does not wait # for the thread to finish because we don't know how # long the job will take to finish. Instead we # provide a `kill` method to call after the shutdown # timeout passes. @thread.raise ::Sidekiq::Shutdown @thread.value if wait end
start()
click to toggle source
# File lib/sidekiq/processor.rb, line 61 def start @thread ||= safe_thread("processor", &method(:run)) end
terminate(wait=false)
click to toggle source
# File lib/sidekiq/processor.rb, line 43 def terminate(wait=false) @done = true return if !@thread @thread.value if wait end
Private Instance Methods
cloned(thing)
click to toggle source
Deep clone the arguments passed to the worker so that if the job fails, what is pushed back onto Redis hasn't been mutated by the worker.
# File lib/sidekiq/processor.rb, line 262 def cloned(thing) Marshal.load(Marshal.dump(thing)) end
constantize(str)
click to toggle source
# File lib/sidekiq/processor.rb, line 266 def constantize(str) names = str.split('::') names.shift if names.empty? || names.first.empty? names.inject(Object) do |constant, name| # the false flag limits search for name to under the constant namespace # which mimics Rails' behaviour constant.const_defined?(name, false) ? constant.const_get(name, false) : constant.const_missing(name) end end
dispatch(job_hash, queue) { |worker| ... }
click to toggle source
# File lib/sidekiq/processor.rb, line 117 def dispatch(job_hash, queue) # since middleware can mutate the job hash # we clone here so we report the original # job structure to the Web UI pristine = cloned(job_hash) Sidekiq::Logging.with_job_hash_context(job_hash) do @retrier.global(pristine, queue) do @logging.call(job_hash, queue) do stats(pristine, queue) do # Rails 5 requires a Reloader to wrap code execution. In order to # constantize the worker and instantiate an instance, we have to call # the Reloader. It handles code loading, db connection management, etc. # Effectively this block denotes a "unit of work" to Rails. @reloader.call do klass = constantize(job_hash['class']) worker = klass.new worker.jid = job_hash['jid'] @retrier.local(worker, pristine, queue) do yield worker end end end end end end end
execute_job(worker, cloned_args)
click to toggle source
# File lib/sidekiq/processor.rb, line 190 def execute_job(worker, cloned_args) worker.perform(*cloned_args) end
fetch()
click to toggle source
# File lib/sidekiq/processor.rb, line 97 def fetch j = get_one if j && @done j.requeue nil else j end end
get_one()
click to toggle source
# File lib/sidekiq/processor.rb, line 86 def get_one begin work = @strategy.retrieve_work (logger.info { "Redis is online, #{::Process.clock_gettime(::Process::CLOCK_MONOTONIC) - @down} sec downtime" }; @down = nil) if @down work rescue Sidekiq::Shutdown rescue => ex handle_fetch_exception(ex) end end
handle_fetch_exception(ex)
click to toggle source
# File lib/sidekiq/processor.rb, line 107 def handle_fetch_exception(ex) if !@down @down = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) logger.error("Error fetching job: #{ex}") handle_exception(ex) end sleep(1) nil end
process(work)
click to toggle source
# File lib/sidekiq/processor.rb, line 145 def process(work) jobstr = work.job queue = work.queue_name # Treat malformed JSON as a special case: job goes straight to the morgue. job_hash = nil begin job_hash = Sidekiq.load_json(jobstr) rescue => ex handle_exception(ex, { :context => "Invalid JSON for job", :jobstr => jobstr }) # we can't notify because the job isn't a valid hash payload. DeadSet.new.kill(jobstr, notify_failure: false) return work.acknowledge end ack = true begin dispatch(job_hash, queue) do |worker| Sidekiq.server_middleware.invoke(worker, job_hash, queue) do execute_job(worker, cloned(job_hash['args'])) end end rescue Sidekiq::Shutdown # Had to force kill this job because it didn't finish # within the timeout. Don't acknowledge the work since # we didn't properly finish it. ack = false rescue Sidekiq::JobRetry::Handled => h # this is the common case: job raised error and Sidekiq::JobRetry::Handled # signals that we created a retry successfully. We can acknowlege the job. e = h.cause ? h.cause : h handle_exception(e, { :context => "Job raised exception", :job => job_hash, :jobstr => jobstr }) raise e rescue Exception => ex # Unexpected error! This is very bad and indicates an exception that got past # the retry subsystem (e.g. network partition). We won't acknowledge the job # so it can be rescued when using Sidekiq Pro. ack = false handle_exception(ex, { :context => "Internal exception!", :job => job_hash, :jobstr => jobstr }) raise e ensure work.acknowledge if ack end end
process_one()
click to toggle source
# File lib/sidekiq/processor.rb, line 80 def process_one @job = fetch process(@job) if @job @job = nil end
run()
click to toggle source
# File lib/sidekiq/processor.rb, line 67 def run begin while !@done process_one end @mgr.processor_stopped(self) rescue Sidekiq::Shutdown @mgr.processor_stopped(self) rescue Exception => ex @mgr.processor_died(self, ex) end end
stats(job_hash, queue) { || ... }
click to toggle source
# File lib/sidekiq/processor.rb, line 244 def stats(job_hash, queue) tid = Sidekiq::Logging.tid WORKER_STATE.set(tid, {:queue => queue, :payload => job_hash, :run_at => Time.now.to_i }) begin yield rescue Exception FAILURE.incr raise ensure WORKER_STATE.delete(tid) PROCESSED.incr end end