module Sidekiq::Worker::ClassMethods

The Sidekiq testing infrastructure overrides #perform_async so that it does not actually touch the network. Instead it stores the asynchronous jobs in a per-class array so that their presence/absence can be asserted by your tests.

This is similar to ActionMailer's :test delivery_method and its ActionMailer::Base.deliveries array.

Example:

require 'sidekiq/testing'

assert_equal 0, HardWorker.jobs.size
HardWorker.perform_async(:something)
assert_equal 1, HardWorker.jobs.size
assert_equal :something, HardWorker.jobs[0]['args'][0]

assert_equal 0, Sidekiq::Extensions::DelayedMailer.jobs.size
MyMailer.delay.send_welcome_email('foo@example.com')
assert_equal 1, Sidekiq::Extensions::DelayedMailer.jobs.size

You can also clear and drain all workers' jobs:

assert_equal 0, Sidekiq::Extensions::DelayedMailer.jobs.size
assert_equal 0, Sidekiq::Extensions::DelayedModel.jobs.size

MyMailer.delay.send_welcome_email('foo@example.com')
MyModel.delay.do_something_hard

assert_equal 1, Sidekiq::Extensions::DelayedMailer.jobs.size
assert_equal 1, Sidekiq::Extensions::DelayedModel.jobs.size

Sidekiq::Worker.clear_all # or .drain_all

assert_equal 0, Sidekiq::Extensions::DelayedMailer.jobs.size
assert_equal 0, Sidekiq::Extensions::DelayedModel.jobs.size

This can be useful to make sure jobs don't linger between tests:

RSpec.configure do |config|
  config.before(:each) do
    Sidekiq::Worker.clear_all
  end
end

or for acceptance testing, i.e. with cucumber:

AfterStep do
  Sidekiq::Worker.drain_all
end

When I sign up as "foo@example.com"
Then I should receive a welcome email to "foo@example.com"

Constants

ACCESSOR_MUTEX

Public Instance Methods

clear() click to toggle source

Clear all jobs for this worker

# File lib/sidekiq/testing.rb, line 269
def clear
  Queues.clear_for(queue, self.to_s)
end
delay(*args) click to toggle source
# File lib/sidekiq/worker.rb, line 75
def delay(*args)
  raise ArgumentError, "Do not call .delay on a Sidekiq::Worker class, call .perform_async"
end
delay_for(*args) click to toggle source
# File lib/sidekiq/worker.rb, line 79
def delay_for(*args)
  raise ArgumentError, "Do not call .delay_for on a Sidekiq::Worker class, call .perform_in"
end
delay_until(*args) click to toggle source
# File lib/sidekiq/worker.rb, line 83
def delay_until(*args)
  raise ArgumentError, "Do not call .delay_until on a Sidekiq::Worker class, call .perform_at"
end
drain() click to toggle source

Drain and run all jobs for this worker

# File lib/sidekiq/testing.rb, line 274
def drain
  while jobs.any?
    next_job = jobs.first
    Queues.delete_for(next_job["jid"], next_job["queue"], self.to_s)
    process_job(next_job)
  end
end
execute_job(worker, args) click to toggle source
# File lib/sidekiq/testing.rb, line 299
def execute_job(worker, args)
  worker.perform(*args)
end
jobs() click to toggle source

Jobs queued for this worker

# File lib/sidekiq/testing.rb, line 264
def jobs
  Queues.jobs_by_worker[self.to_s]
end
perform_async(*args) click to toggle source
# File lib/sidekiq/worker.rb, line 91
def perform_async(*args)
  client_push('class' => self, 'args' => args)
end
perform_at(interval, *args)
Alias for: perform_in
perform_in(interval, *args) click to toggle source

interval must be a timestamp, numeric or something that acts

numeric (like an activesupport time interval).
# File lib/sidekiq/worker.rb, line 97
def perform_in(interval, *args)
  int = interval.to_f
  now = Time.now.to_f
  ts = (int < 1_000_000_000 ? now + int : int)

  item = { 'class' => self, 'args' => args, 'at' => ts }

  # Optimization to enqueue something now that is scheduled to go out now or in the past
  item.delete('at') if ts <= now

  client_push(item)
end
Also aliased as: perform_at
perform_one() click to toggle source

Pop out a single job and perform it

# File lib/sidekiq/testing.rb, line 283
def perform_one
  raise(EmptyQueueError, "perform_one called with empty job queue") if jobs.empty?
  next_job = jobs.first
  Queues.delete_for(next_job["jid"], queue, self.to_s)
  process_job(next_job)
end
process_job(job) click to toggle source
# File lib/sidekiq/testing.rb, line 290
def process_job(job)
  worker = new
  worker.jid = job['jid']
  worker.bid = job['bid'] if worker.respond_to?(:bid=)
  Sidekiq::Testing.server_middleware.invoke(worker, job, job['queue']) do
    execute_job(worker, job['args'])
  end
end
queue() click to toggle source

Queue for this worker

# File lib/sidekiq/testing.rb, line 259
def queue
  self.sidekiq_options["queue"]
end
set(options) click to toggle source
# File lib/sidekiq/worker.rb, line 87
def set(options)
  Setter.new(self, options)
end
sidekiq_class_attribute(*attrs) click to toggle source
# File lib/sidekiq/worker.rb, line 151
def sidekiq_class_attribute(*attrs)
  instance_reader = true
  instance_writer = true

  attrs.each do |name|
    synchronized_getter = "__synchronized_#{name}"

    singleton_class.instance_eval do
      undef_method(name) if method_defined?(name) || private_method_defined?(name)
    end

    define_singleton_method(synchronized_getter) { nil }
    singleton_class.class_eval do
      private(synchronized_getter)
    end

    define_singleton_method(name) { ACCESSOR_MUTEX.synchronize { send synchronized_getter } }

    ivar = "@#{name}"

    singleton_class.instance_eval do
      m = "#{name}="
      undef_method(m) if method_defined?(m) || private_method_defined?(m)
    end
    define_singleton_method("#{name}=") do |val|
      singleton_class.class_eval do
        ACCESSOR_MUTEX.synchronize do
          undef_method(synchronized_getter) if method_defined?(synchronized_getter) || private_method_defined?(synchronized_getter)
          define_method(synchronized_getter) { val }
        end
      end

      if singleton_class?
        class_eval do
          undef_method(name) if method_defined?(name) || private_method_defined?(name)
          define_method(name) do
            if instance_variable_defined? ivar
              instance_variable_get ivar
            else
              singleton_class.send name
            end
          end
        end
      end
      val
    end

    if instance_reader
      undef_method(name) if method_defined?(name) || private_method_defined?(name)
      define_method(name) do
        if instance_variable_defined?(ivar)
          instance_variable_get ivar
        else
          self.class.public_send name
        end
      end
    end

    if instance_writer
      m = "#{name}="
      undef_method(m) if method_defined?(m) || private_method_defined?(m)
      attr_writer name
    end
  end
end
sidekiq_options(opts={}) click to toggle source

Allows customization for this type of Worker. Legal options:

queue - use a named queue for this Worker, default 'default'
retry - enable the RetryJobs middleware for this Worker, *true* to use the default
   or *Integer* count
backtrace - whether to save any error backtrace in the retry payload to display in web UI,
   can be true, false or an integer number of lines to save, default *false*
pool - use the given Redis connection pool to push this type of job to a given shard.

In practice, any option is allowed. This is the main mechanism to configure the options for a specific job.

# File lib/sidekiq/worker.rb, line 124
def sidekiq_options(opts={})
  # stringify
  self.sidekiq_options_hash = get_sidekiq_options.merge(Hash[opts.map{|k, v| [k.to_s, v]}])
end
sidekiq_retries_exhausted(&block) click to toggle source
# File lib/sidekiq/worker.rb, line 133
def sidekiq_retries_exhausted(&block)
  self.sidekiq_retries_exhausted_block = block
end
sidekiq_retry_in(&block) click to toggle source
# File lib/sidekiq/worker.rb, line 129
def sidekiq_retry_in(&block)
  self.sidekiq_retry_in_block = block
end