module Sidekiq
Sidekiq's Data API provides a Ruby object model on top of Sidekiq's runtime data in Redis. This API should never be used within application code for business logic.
The Sidekiq
server process never uses this API: all data manipulation is done directly for performance reasons to ensure we are using Redis as efficiently as possible at every callsite.
This file is designed to be required within the user's deployment script; it should need a bare minimum of dependencies.
require "sidekiq/metrics/deploy" gitdesc = `git log -1 --format="%h %s"`.strip d = Sidekiq::Metrics::Deploy.new d.mark(label: gitdesc)
Note that you cannot mark more than once per minute. This is a feature, not a bug.
This file contains the components which track execution metrics within Sidekiq
.
SdNotify
is a pure-Ruby implementation of sd_notify(3). It can be used to notify systemd about state changes. Methods of this package are no-op on non-systemd systems (eg. Darwin).
The API maps closely to the original implementation of sd_notify(3), therefore be sure to check the official man pages prior to using SdNotify
.
@see www.freedesktop.org/software/systemd/man/sd_notify.html
Sidekiq's systemd integration allows Sidekiq
to inform systemd:
1. when it has successfully started 2. when it is starting shutdown 3. periodically for a liveness check with a watchdog thread
Use `Sidekiq.transactional_push!` in your sidekiq.rb initializer
Constants
- ClientMiddleware
Server-side middleware must import this Module in order to get access to server resources during `call`.
- DEFAULTS
- DEFAULT_ERROR_HANDLER
DEFAULT_ERROR_HANDLER
is a constant that allows the default error handler to be referenced. It must be defined here, after thedefault_error_handler
method is defined.- FAKE_INFO
- Job
Sidekiq::Job
is a new alias forSidekiq::Worker
as ofSidekiq
6.3.0. Use `include Sidekiq::Job` rather than `include Sidekiq::Worker`.The term “worker” is too generic and overly confusing, used in several different contexts meaning different things. Many people call a
Sidekiq
process a “worker”. Some people call the thread that executes jobs a “worker”. This change bringsSidekiq
closer to ActiveJob where your job classes extend ApplicationJob.- LICENSE
- NAME
- VERSION
- Workers
The
WorkSet
stores the work being done by thisSidekiq
cluster. It tracks the process and thread working on each job.WARNING WARNING WARNING
This is live data that can change every millisecond. If you call size => 5 and then expect each to be called 5 times, you're going to have a bad time.
works = Sidekiq::WorkSet.new works.size => 2 works.each do |process_id, thread_id, work| # process_id is a unique identifier per Sidekiq process # thread_id is a unique identifier per thread # work is a Hash which looks like: # { 'queue' => name, 'run_at' => timestamp, 'payload' => job_hash } # run_at is an epoch Integer. end
Public Class Methods
# File lib/sidekiq.rb, line 106 def self.[](key) @config[key] end
# File lib/sidekiq.rb, line 110 def self.[]=(key, val) @config[key] = val end
How frequently Redis should be checked by a random Sidekiq
process for scheduled and retriable jobs. Each individual process will take turns by waiting some multiple of this value.
See sidekiq/scheduled.rb for an in-depth explanation of this value
# File lib/sidekiq.rb, line 303 def self.average_scheduled_poll_interval=(interval) self[:average_scheduled_poll_interval] = interval end
# File lib/sidekiq.rb, line 209 def self.client_middleware @client_chain ||= Middleware::Chain.new(self) yield @client_chain if block_given? @client_chain end
config.concurrency = 5
# File lib/sidekiq.rb, line 60 def self.concurrency=(val) self[:concurrency] = Integer(val) end
Configuration for Sidekiq
client, use like:
Sidekiq.configure_client do |config| config.redis = { size: 1, url: 'redis://myhost:8877/0' } end
# File lib/sidekiq.rb, line 151 def self.configure_client yield self unless server? end
Configuration for Sidekiq
server, use like:
Sidekiq.configure_server do |config| config.server_middleware do |chain| chain.add MyServerHook end end
# File lib/sidekiq.rb, line 141 def self.configure_server yield self if server? end
Death handlers are called when all retries for a job have been exhausted and the job dies. It's the notification to your application that this job will not succeed without manual intervention.
Sidekiq.configure_server
do |config|
config.death_handlers << ->(job, ex) do end
end
# File lib/sidekiq.rb, line 250 def self.death_handlers self[:death_handlers] end
Private APIs
# File lib/sidekiq.rb, line 84 def self.default_error_handler(ex, ctx) logger.warn(dump_json(ctx)) unless ctx.empty? logger.warn("#{ex.class.name}: #{ex.message}") logger.warn(ex.backtrace.join("\n")) unless ex.backtrace.nil? end
# File lib/sidekiq.rb, line 237 def self.default_job_options @default_job_options ||= {"retry" => true, "queue" => "default"} end
# File lib/sidekiq.rb, line 229 def self.default_job_options=(hash) @default_job_options = default_job_options.merge(hash.transform_keys(&:to_s)) end
# File lib/sidekiq.rb, line 221 def self.default_server_middleware Middleware::Chain.new(self) end
# File lib/sidekiq.rb, line 233 def self.default_worker_options # deprecated @default_job_options ||= {"retry" => true, "queue" => "default"} end
# File lib/sidekiq.rb, line 225 def self.default_worker_options=(hash) # deprecated @default_job_options = default_job_options.merge(hash.transform_keys(&:to_s)) end
# File lib/sidekiq.rb, line 258 def self.dump_json(object) JSON.generate(object) end
# File lib/sidekiq.rb, line 294 def self.ent? defined?(Sidekiq::Enterprise) end
Register a proc to handle any error which occurs within the Sidekiq
process.
Sidekiq.configure_server do |config| config.error_handlers << proc {|ex,ctx_hash| MyErrorService.notify(ex, ctx_hash) } end
The default error handler logs errors to Sidekiq.logger
.
# File lib/sidekiq.rb, line 314 def self.error_handlers self[:error_handlers] end
# File lib/sidekiq.rb, line 118 def self.fetch(*args, &block) @config.fetch(*args, &block) end
# File lib/sidekiq.rb, line 122 def self.handle_exception(ex, ctx = {}) self[:error_handlers].each do |handler| handler.call(ex, ctx) rescue => ex logger.error "!!! ERROR HANDLER THREW AN ERROR !!!" logger.error ex logger.error ex.backtrace.join("\n") unless ex.backtrace.nil? end end
# File lib/sidekiq.rb, line 254 def self.load_json(string) JSON.parse(string) end
# File lib/sidekiq.rb, line 262 def self.log_formatter @log_formatter ||= if ENV["DYNO"] Sidekiq::Logger::Formatters::WithoutTimestamp.new else Sidekiq::Logger::Formatters::Pretty.new end end
# File lib/sidekiq.rb, line 270 def self.log_formatter=(log_formatter) @log_formatter = log_formatter logger.formatter = log_formatter end
# File lib/sidekiq.rb, line 275 def self.logger @logger ||= Sidekiq::Logger.new($stdout, level: :info) end
# File lib/sidekiq.rb, line 279 def self.logger=(logger) if logger.nil? self.logger.level = Logger::FATAL return self.logger end logger.extend(Sidekiq::LoggingUtils) @logger = logger end
# File lib/sidekiq.rb, line 114 def self.merge!(hash) @config.merge!(hash) end
Register a block to run at a point in the Sidekiq
lifecycle. :startup, :quiet or :shutdown are valid events.
Sidekiq.configure_server do |config| config.on(:shutdown) do puts "Goodbye cruel world!" end end
# File lib/sidekiq.rb, line 326 def self.on(event, &block) raise ArgumentError, "Symbols only please: #{event}" unless event.is_a?(Symbol) raise ArgumentError, "Invalid event name: #{event}" unless self[:lifecycle_events].key?(event) self[:lifecycle_events][event] << block end
# File lib/sidekiq.rb, line 96 def self.options logger.warn "`config.options[:key] = value` is deprecated, use `config[:key] = value`: #{caller(1..2)}" @config end
# File lib/sidekiq.rb, line 101 def self.options=(opts) logger.warn "config.options = hash` is deprecated, use `config.merge!(hash)`: #{caller(1..2)}" @config = opts end
# File lib/sidekiq.rb, line 290 def self.pro? defined?(Sidekiq::Pro) end
config.queues = %w( high default low ) # strict config.queues = %w( high,3 default,2 low,1 ) # weighted config.queues = %w( feature1,1 feature2,1 feature3,1 ) # random
With weighted priority, queue will be checked first (weight / total) of the time. high will be checked first (3/6) or 50% of the time. I'd recommend setting weights between 1-10. Weights in the hundreds or thousands are ridiculous and unnecessarily expensive. You can get random queue ordering by explicitly setting all weights to 1.
# File lib/sidekiq.rb, line 73 def self.queues=(val) self[:queues] = Array(val).each_with_object([]) do |qstr, memo| name, weight = qstr.split(",") self[:strict] = false if weight.to_i > 0 [weight.to_i, 1].max.times do memo << name end end end
# File lib/sidekiq.rb, line 159 def self.redis raise ArgumentError, "requires a block" unless block_given? redis_pool.with do |conn| retryable = true begin yield conn rescue RedisConnection.adapter::BaseError => ex # 2550 Failover can cause the server to become a replica, need # to disconnect and reopen the socket to get back to the primary. # 4495 Use the same logic if we have a "Not enough replicas" error from the primary # 4985 Use the same logic when a blocking command is force-unblocked # The same retry logic is also used in client.rb if retryable && ex.message =~ /READONLY|NOREPLICAS|UNBLOCKED/ conn.disconnect! retryable = false retry end raise end end end
# File lib/sidekiq.rb, line 201 def self.redis=(hash) @redis = if hash.is_a?(ConnectionPool) hash else RedisConnection.create(hash) end end
# File lib/sidekiq.rb, line 181 def self.redis_info redis do |conn| # admin commands can't go through redis-namespace starting # in redis-namespace 2.0 if conn.respond_to?(:namespace) conn.redis.info else conn.info end rescue RedisConnection.adapter::CommandError => ex # 2850 return fake version when INFO command has (probably) been renamed raise unless /unknown command/.match?(ex.message) FAKE_INFO end end
# File lib/sidekiq.rb, line 197 def self.redis_pool @redis ||= RedisConnection.create end
# File lib/sidekiq.rb, line 155 def self.server? defined?(Sidekiq::CLI) end
# File lib/sidekiq.rb, line 215 def self.server_middleware @server_chain ||= default_server_middleware yield @server_chain if block_given? @server_chain end
# File lib/sidekiq/systemd.rb, line 8 def self.start_watchdog usec = Integer(ENV["WATCHDOG_USEC"]) return Sidekiq.logger.error("systemd Watchdog too fast: " + usec) if usec < 1_000_000 sec_f = usec / 1_000_000.0 # "It is recommended that a daemon sends a keep-alive notification message # to the service manager every half of the time returned here." ping_f = sec_f / 2 Sidekiq.logger.info "Pinging systemd watchdog every #{ping_f.round(1)} sec" Thread.new do loop do sleep ping_f Sidekiq::SdNotify.watchdog end end end
# File lib/sidekiq.rb, line 332 def self.strict_args!(mode = :raise) self[:on_complex_arguments] = mode end
# File lib/sidekiq/transaction_aware_client.rb, line 33 def self.transactional_push! begin require "after_commit_everywhere" rescue LoadError Sidekiq.logger.error("You need to add after_commit_everywhere to your Gemfile to use Sidekiq's transactional client") raise end default_job_options["client_class"] = Sidekiq::TransactionAwareClient Sidekiq::JobUtil::TRANSIENT_ATTRIBUTES << "client_class" true end
# File lib/sidekiq.rb, line 55 def self.❨╯°□°❩╯︵┻━┻ puts "Calm down, yo." end