module Sidekiq::Worker::ClassMethods
The Sidekiq
testing infrastructure overrides perform_async
so that it does not actually touch the network. Instead it stores the asynchronous jobs in a per-class array so that their presence/absence can be asserted by your tests.
This is similar to ActionMailer's :test delivery_method and its ActionMailer::Base.deliveries array.
Example:
require 'sidekiq/testing' assert_equal 0, HardWorker.jobs.size HardWorker.perform_async(:something) assert_equal 1, HardWorker.jobs.size assert_equal :something, HardWorker.jobs[0]['args'][0] assert_equal 0, Sidekiq::Extensions::DelayedMailer.jobs.size MyMailer.delay.send_welcome_email('foo@example.com') assert_equal 1, Sidekiq::Extensions::DelayedMailer.jobs.size
You can also clear and drain all workers' jobs:
assert_equal 0, Sidekiq::Extensions::DelayedMailer.jobs.size assert_equal 0, Sidekiq::Extensions::DelayedModel.jobs.size MyMailer.delay.send_welcome_email('foo@example.com') MyModel.delay.do_something_hard assert_equal 1, Sidekiq::Extensions::DelayedMailer.jobs.size assert_equal 1, Sidekiq::Extensions::DelayedModel.jobs.size Sidekiq::Worker.clear_all # or .drain_all assert_equal 0, Sidekiq::Extensions::DelayedMailer.jobs.size assert_equal 0, Sidekiq::Extensions::DelayedModel.jobs.size
This can be useful to make sure jobs don't linger between tests:
RSpec.configure do |config| config.before(:each) do Sidekiq::Worker.clear_all end end
or for acceptance testing, i.e. with cucumber:
AfterStep do Sidekiq::Worker.drain_all end When I sign up as "foo@example.com" Then I should receive a welcome email to "foo@example.com"
Constants
- ACCESSOR_MUTEX
Public Instance Methods
Clear all jobs for this worker
# File lib/sidekiq/testing.rb, line 270 def clear Queues.clear_for(queue, self.to_s) end
# File lib/sidekiq/worker.rb, line 76 def delay(*args) raise ArgumentError, "Do not call .delay on a Sidekiq::Worker class, call .perform_async" end
# File lib/sidekiq/worker.rb, line 80 def delay_for(*args) raise ArgumentError, "Do not call .delay_for on a Sidekiq::Worker class, call .perform_in" end
# File lib/sidekiq/worker.rb, line 84 def delay_until(*args) raise ArgumentError, "Do not call .delay_until on a Sidekiq::Worker class, call .perform_at" end
Drain and run all jobs for this worker
# File lib/sidekiq/testing.rb, line 275 def drain while jobs.any? next_job = jobs.first Queues.delete_for(next_job["jid"], next_job["queue"], self.to_s) process_job(next_job) end end
# File lib/sidekiq/testing.rb, line 300 def execute_job(worker, args) worker.perform(*args) end
Jobs queued for this worker
# File lib/sidekiq/testing.rb, line 265 def jobs Queues.jobs_by_worker[self.to_s] end
# File lib/sidekiq/worker.rb, line 92 def perform_async(*args) client_push('class' => self, 'args' => args) end
interval
must be a timestamp, numeric or something that acts
numeric (like an activesupport time interval).
# File lib/sidekiq/worker.rb, line 98 def perform_in(interval, *args) int = interval.to_f now = Time.now.to_f ts = (int < 1_000_000_000 ? now + int : int) item = { 'class' => self, 'args' => args, 'at' => ts } # Optimization to enqueue something now that is scheduled to go out now or in the past item.delete('at') if ts <= now client_push(item) end
Pop out a single job and perform it
# File lib/sidekiq/testing.rb, line 284 def perform_one raise(EmptyQueueError, "perform_one called with empty job queue") if jobs.empty? next_job = jobs.first Queues.delete_for(next_job["jid"], queue, self.to_s) process_job(next_job) end
# File lib/sidekiq/testing.rb, line 291 def process_job(job) worker = new worker.jid = job['jid'] worker.bid = job['bid'] if worker.respond_to?(:bid=) Sidekiq::Testing.server_middleware.invoke(worker, job, job['queue']) do execute_job(worker, job['args']) end end
Queue
for this worker
# File lib/sidekiq/testing.rb, line 260 def queue self.sidekiq_options["queue"] end
# File lib/sidekiq/worker.rb, line 88 def set(options) Setter.new(self, options) end
# File lib/sidekiq/worker.rb, line 152 def sidekiq_class_attribute(*attrs) instance_reader = true instance_writer = true attrs.each do |name| synchronized_getter = "__synchronized_#{name}" singleton_class.instance_eval do undef_method(name) if method_defined?(name) || private_method_defined?(name) end define_singleton_method(synchronized_getter) { nil } singleton_class.class_eval do private(synchronized_getter) end define_singleton_method(name) { ACCESSOR_MUTEX.synchronize { send synchronized_getter } } ivar = "@#{name}" singleton_class.instance_eval do m = "#{name}=" undef_method(m) if method_defined?(m) || private_method_defined?(m) end define_singleton_method("#{name}=") do |val| singleton_class.class_eval do ACCESSOR_MUTEX.synchronize do undef_method(synchronized_getter) if method_defined?(synchronized_getter) || private_method_defined?(synchronized_getter) define_method(synchronized_getter) { val } end end if singleton_class? class_eval do undef_method(name) if method_defined?(name) || private_method_defined?(name) define_method(name) do if instance_variable_defined? ivar instance_variable_get ivar else singleton_class.send name end end end end val end if instance_reader undef_method(name) if method_defined?(name) || private_method_defined?(name) define_method(name) do if instance_variable_defined?(ivar) instance_variable_get ivar else self.class.public_send name end end end if instance_writer m = "#{name}=" undef_method(m) if method_defined?(m) || private_method_defined?(m) attr_writer name end end end
Allows customization for this type of Worker
. Legal options:
queue - use a named queue for this Worker, default 'default' retry - enable the RetryJobs middleware for this Worker, *true* to use the default or *Integer* count backtrace - whether to save any error backtrace in the retry payload to display in web UI, can be true, false or an integer number of lines to save, default *false* pool - use the given Redis connection pool to push this type of job to a given shard.
In practice, any option is allowed. This is the main mechanism to configure the options for a specific job.
# File lib/sidekiq/worker.rb, line 125 def sidekiq_options(opts={}) # stringify self.sidekiq_options_hash = get_sidekiq_options.merge(Hash[opts.map{|k, v| [k.to_s, v]}]) end
# File lib/sidekiq/worker.rb, line 134 def sidekiq_retries_exhausted(&block) self.sidekiq_retries_exhausted_block = block end
# File lib/sidekiq/worker.rb, line 130 def sidekiq_retry_in(&block) self.sidekiq_retry_in_block = block end