Class: Sidekiq::Queue

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/sidekiq/api.rb

Overview

Encapsulates a queue within Sidekiq. Allows enumeration of all jobs within the queue and deletion of jobs.

queue = Sidekiq::Queue.new("mailer")
queue.each do |job|
  job.klass # => 'MyWorker'
  job.args # => [1, 2, 3]
  job.delete if job.jid == 'abcdef1234567890'
end

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name = "default") ⇒ Queue

Returns a new instance of Queue.



212
213
214
215
# File 'lib/sidekiq/api.rb', line 212

def initialize(name="default")
  @name = name
  @rname = "queue:#{name}"
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



210
211
212
# File 'lib/sidekiq/api.rb', line 210

def name
  @name
end

Class Method Details

.allObject

Return all known queues within Redis.



206
207
208
# File 'lib/sidekiq/api.rb', line 206

def self.all
  Sidekiq.redis { |c| c.smembers('queues'.freeze) }.sort.map { |q| Sidekiq::Queue.new(q) }
end

Instance Method Details

#clearObject Also known as: 💣



272
273
274
275
276
277
278
279
# File 'lib/sidekiq/api.rb', line 272

def clear
  Sidekiq.redis do |conn|
    conn.multi do
      conn.del(@rname)
      conn.srem("queues".freeze, name)
    end
  end
end

#eachObject



242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
# File 'lib/sidekiq/api.rb', line 242

def each
  initial_size = size
  deleted_size = 0
  page = 0
  page_size = 50

  while true do
    range_start = page * page_size - deleted_size
    range_end   = range_start + page_size - 1
    entries = Sidekiq.redis do |conn|
      conn.lrange @rname, range_start, range_end
    end
    break if entries.empty?
    page += 1
    entries.each do |entry|
      yield Job.new(entry, @name)
    end
    deleted_size = initial_size - size
  end
end

#find_job(jid) ⇒ Object

Find the job with the given JID within this queue.

This is a slow, inefficient operation. Do not use under normal conditions. Sidekiq Pro contains a faster version.



268
269
270
# File 'lib/sidekiq/api.rb', line 268

def find_job(jid)
  detect { |j| j.jid == jid }
end

#latencyObject

Calculates this queue’s latency, the difference in seconds since the oldest job in the queue was enqueued.

Returns:

  • Float



231
232
233
234
235
236
237
238
239
240
# File 'lib/sidekiq/api.rb', line 231

def latency
  entry = Sidekiq.redis do |conn|
    conn.lrange(@rname, -1, -1)
  end.first
  return 0 unless entry
  job = Sidekiq.load_json(entry)
  now = Time.now.to_f
  thence = job['enqueued_at'] || now
  now - thence
end

#paused?Boolean

Sidekiq Pro overrides this

Returns:

  • (Boolean)


222
223
224
# File 'lib/sidekiq/api.rb', line 222

def paused?
  false
end

#sizeObject



217
218
219
# File 'lib/sidekiq/api.rb', line 217

def size
  Sidekiq.redis { |con| con.llen(@rname) }
end