class Sidekiq::Stats

Public Class Methods

new() click to toggle source
# File lib/sidekiq/api.rb, line 10
def initialize
  fetch_stats_fast!
end

Public Instance Methods

dead_size() click to toggle source
# File lib/sidekiq/api.rb, line 30
def dead_size
  stat :dead_size
end
default_queue_latency() click to toggle source
# File lib/sidekiq/api.rb, line 46
def default_queue_latency
  stat :default_queue_latency
end
enqueued() click to toggle source
# File lib/sidekiq/api.rb, line 34
def enqueued
  stat :enqueued
end
failed() click to toggle source
# File lib/sidekiq/api.rb, line 18
def failed
  stat :failed
end
fetch_stats!() click to toggle source
# File lib/sidekiq/api.rb, line 119
def fetch_stats!
  fetch_stats_fast!
  fetch_stats_slow!
end
fetch_stats_fast!() click to toggle source

O(1) redis calls

# File lib/sidekiq/api.rb, line 55
def fetch_stats_fast!
  pipe1_res = Sidekiq.redis { |conn|
    conn.pipelined do
      conn.get("stat:processed")
      conn.get("stat:failed")
      conn.zcard("schedule")
      conn.zcard("retry")
      conn.zcard("dead")
      conn.scard("processes")
      conn.lrange("queue:default", -1, -1)
    end
  }

  default_queue_latency = if (entry = pipe1_res[6].first)
    job = begin
      Sidekiq.load_json(entry)
    rescue
      {}
    end
    now = Time.now.to_f
    thence = job["enqueued_at"] || now
    now - thence
  else
    0
  end

  @stats = {
    processed: pipe1_res[0].to_i,
    failed: pipe1_res[1].to_i,
    scheduled_size: pipe1_res[2],
    retry_size: pipe1_res[3],
    dead_size: pipe1_res[4],
    processes_size: pipe1_res[5],

    default_queue_latency: default_queue_latency
  }
end
fetch_stats_slow!() click to toggle source

O(number of processes + number of queues) redis calls

# File lib/sidekiq/api.rb, line 94
def fetch_stats_slow!
  processes = Sidekiq.redis { |conn|
    conn.sscan_each("processes").to_a
  }

  queues = Sidekiq.redis { |conn|
    conn.sscan_each("queues").to_a
  }

  pipe2_res = Sidekiq.redis { |conn|
    conn.pipelined do
      processes.each { |key| conn.hget(key, "busy") }
      queues.each { |queue| conn.llen("queue:#{queue}") }
    end
  }

  s = processes.size
  workers_size = pipe2_res[0...s].sum(&:to_i)
  enqueued = pipe2_res[s..-1].sum(&:to_i)

  @stats[:workers_size] = workers_size
  @stats[:enqueued] = enqueued
  @stats
end
processed() click to toggle source
# File lib/sidekiq/api.rb, line 14
def processed
  stat :processed
end
processes_size() click to toggle source
# File lib/sidekiq/api.rb, line 38
def processes_size
  stat :processes_size
end
queues() click to toggle source
# File lib/sidekiq/api.rb, line 50
def queues
  Sidekiq::Stats::Queues.new.lengths
end
reset(*stats) click to toggle source
# File lib/sidekiq/api.rb, line 124
def reset(*stats)
  all = %w[failed processed]
  stats = stats.empty? ? all : all & stats.flatten.compact.map(&:to_s)

  mset_args = []
  stats.each do |stat|
    mset_args << "stat:#{stat}"
    mset_args << 0
  end
  Sidekiq.redis do |conn|
    conn.mset(*mset_args)
  end
end
retry_size() click to toggle source
# File lib/sidekiq/api.rb, line 26
def retry_size
  stat :retry_size
end
scheduled_size() click to toggle source
# File lib/sidekiq/api.rb, line 22
def scheduled_size
  stat :scheduled_size
end
workers_size() click to toggle source
# File lib/sidekiq/api.rb, line 42
def workers_size
  stat :workers_size
end

Private Instance Methods

stat(s) click to toggle source
# File lib/sidekiq/api.rb, line 140
def stat(s)
  fetch_stats_slow! if @stats[s].nil?
  @stats[s] || raise(ArgumentError, "Unknown stat #{s}")
end