module Sidekiq::Worker::ClassMethods
The Sidekiq
testing infrastructure overrides perform_async
so that it does not actually touch the network. Instead it stores the asynchronous jobs in a per-class array so that their presence/absence can be asserted by your tests.
This is similar to ActionMailer's :test delivery_method and its ActionMailer::Base.deliveries array.
Example:
require 'sidekiq/testing' assert_equal 0, HardWorker.jobs.size HardWorker.perform_async(:something) assert_equal 1, HardWorker.jobs.size assert_equal :something, HardWorker.jobs[0]['args'][0] assert_equal 0, Sidekiq::Extensions::DelayedMailer.jobs.size MyMailer.delay.send_welcome_email('foo@example.com') assert_equal 1, Sidekiq::Extensions::DelayedMailer.jobs.size
You can also clear and drain all workers' jobs:
assert_equal 0, Sidekiq::Extensions::DelayedMailer.jobs.size assert_equal 0, Sidekiq::Extensions::DelayedModel.jobs.size MyMailer.delay.send_welcome_email('foo@example.com') MyModel.delay.do_something_hard assert_equal 1, Sidekiq::Extensions::DelayedMailer.jobs.size assert_equal 1, Sidekiq::Extensions::DelayedModel.jobs.size Sidekiq::Worker.clear_all # or .drain_all assert_equal 0, Sidekiq::Extensions::DelayedMailer.jobs.size assert_equal 0, Sidekiq::Extensions::DelayedModel.jobs.size
This can be useful to make sure jobs don't linger between tests:
RSpec.configure do |config| config.before(:each) do Sidekiq::Worker.clear_all end end
or for acceptance testing, i.e. with cucumber:
AfterStep do Sidekiq::Worker.drain_all end When I sign up as "foo@example.com" Then I should receive a welcome email to "foo@example.com"
Public Instance Methods
Clear all jobs for this worker
# File lib/sidekiq/testing.rb, line 269 def clear Queues.clear_for(queue, to_s) end
# File lib/sidekiq/worker.rb, line 220 def delay(*args) raise ArgumentError, "Do not call .delay on a Sidekiq::Worker class, call .perform_async" end
# File lib/sidekiq/worker.rb, line 224 def delay_for(*args) raise ArgumentError, "Do not call .delay_for on a Sidekiq::Worker class, call .perform_in" end
# File lib/sidekiq/worker.rb, line 228 def delay_until(*args) raise ArgumentError, "Do not call .delay_until on a Sidekiq::Worker class, call .perform_at" end
Drain and run all jobs for this worker
# File lib/sidekiq/testing.rb, line 274 def drain while jobs.any? next_job = jobs.first Queues.delete_for(next_job["jid"], next_job["queue"], to_s) process_job(next_job) end end
# File lib/sidekiq/testing.rb, line 299 def execute_job(worker, args) worker.perform(*args) end
Jobs queued for this worker
# File lib/sidekiq/testing.rb, line 264 def jobs Queues.jobs_by_worker[to_s] end
# File lib/sidekiq/worker.rb, line 240 def perform_async(*args) client_push("class" => self, "args" => args) end
Push a large number of jobs to Redis, while limiting the batch of each job payload to 1,000. This method helps cut down on the number of round trips to Redis, which can increase the performance of enqueueing large numbers of jobs.
items
must be an Array of Arrays.
For finer-grained control, use `Sidekiq::Client.push_bulk` directly.
Example (3 Redis round trips):
SomeWorker.perform_async(1) SomeWorker.perform_async(2) SomeWorker.perform_async(3)
Would instead become (1 Redis round trip):
SomeWorker.perform_bulk([[1], [2], [3]])
# File lib/sidekiq/worker.rb, line 264 def perform_bulk(items, batch_size: 1_000) items.each_slice(batch_size).flat_map do |slice| Sidekiq::Client.push_bulk("class" => self, "args" => slice) end end
interval
must be a timestamp, numeric or something that acts
numeric (like an activesupport time interval).
# File lib/sidekiq/worker.rb, line 272 def perform_in(interval, *args) int = interval.to_f now = Time.now.to_f ts = (int < 1_000_000_000 ? now + int : int) item = {"class" => self, "args" => args} # Optimization to enqueue something now that is scheduled to go out now or in the past item["at"] = ts if ts > now client_push(item) end
Pop out a single job and perform it
# File lib/sidekiq/testing.rb, line 283 def perform_one raise(EmptyQueueError, "perform_one called with empty job queue") if jobs.empty? next_job = jobs.first Queues.delete_for(next_job["jid"], queue, to_s) process_job(next_job) end
# File lib/sidekiq/testing.rb, line 290 def process_job(job) worker = new worker.jid = job["jid"] worker.bid = job["bid"] if worker.respond_to?(:bid=) Sidekiq::Testing.server_middleware.invoke(worker, job, job["queue"]) do execute_job(worker, job["args"]) end end
Queue
for this worker
# File lib/sidekiq/testing.rb, line 259 def queue get_sidekiq_options["queue"] end
# File lib/sidekiq/worker.rb, line 232 def queue_as(q) sidekiq_options("queue" => q.to_s) end
# File lib/sidekiq/worker.rb, line 236 def set(options) Setter.new(self, options) end
Allows customization for this type of Worker
. Legal options:
queue - use a named queue for this Worker, default 'default' retry - enable the RetryJobs middleware for this Worker, *true* to use the default or *Integer* count backtrace - whether to save any error backtrace in the retry payload to display in web UI, can be true, false or an integer number of lines to save, default *false* pool - use the given Redis connection pool to push this type of job to a given shard.
In practice, any option is allowed. This is the main mechanism to configure the options for a specific job.
# File lib/sidekiq/worker.rb, line 299 def sidekiq_options(opts = {}) super end