Class: Sidekiq::Workers

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/sidekiq/api.rb

Overview

A worker is a thread that is currently processing a job. Programmatic access to the current active worker set.

WARNING WARNING WARNING

This is live data that can change every millisecond. If you call #size => 5 and then expect #each to be called 5 times, you’re going to have a bad time.

workers = Sidekiq::Workers.new
workers.size => 2
workers.each do |process_id, thread_id, work|
  # process_id is a unique identifier per Sidekiq process
  # thread_id is a unique identifier per thread
  # work is a Hash which looks like:
  # { 'queue' => name, 'run_at' => timestamp, 'payload' => msg }
  # run_at is an epoch Integer.
end

Instance Method Summary collapse

Instance Method Details

#each(&block) ⇒ Object



919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
# File 'lib/sidekiq/api.rb', line 919

def each(&block)
  results = []
  Sidekiq.redis do |conn|
    procs = conn.sscan_each("processes").to_a
    procs.sort.each do |key|
      valid, workers = conn.pipelined {
        conn.exists?(key)
        conn.hgetall("#{key}:workers")
      }
      next unless valid
      workers.each_pair do |tid, json|
        hsh = Sidekiq.load_json(json)
        p = hsh["payload"]
        # avoid breaking API, this is a side effect of the JSON optimization in #4316
        hsh["payload"] = Sidekiq.load_json(p) if p.is_a?(String)
        results << [key, tid, hsh]
      end
    end
  end

  results.sort_by { |(_, _, hsh)| hsh["run_at"] }.each(&block)
end

#sizeObject

Note that #size is only as accurate as Sidekiq’s heartbeat, which happens every 5 seconds. It is NOT real-time.

Not very efficient if you have lots of Sidekiq processes but the alternative is a global counter which can easily get out of sync with crashy processes.



948
949
950
951
952
953
954
955
956
957
958
959
960
961
# File 'lib/sidekiq/api.rb', line 948

def size
  Sidekiq.redis do |conn|
    procs = conn.sscan_each("processes").to_a
    if procs.empty?
      0
    else
      conn.pipelined {
        procs.each do |key|
          conn.hget(key, "busy")
        end
      }.sum(&:to_i)
    end
  end
end