Class: Roundhouse::Queue

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/roundhouse/api.rb

Overview

Encapsulates a queue within Roundhouse. Allows enumeration of all jobs within the queue and deletion of jobs.

queue = Roundhouse::Queue.new("mailer")
queue.each do |job|
  job.klass # => 'MyWorker'
  job.args # => [1, 2, 3]
  job.delete if job.jid == 'abcdef1234567890'
end

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue_id) ⇒ Queue

Returns a new instance of Queue.



239
240
241
242
# File 'lib/roundhouse/api.rb', line 239

def initialize(queue_id)
  @queue_id = queue_id.to_i
  @rname = "queue:#{queue_id}"
end

Instance Attribute Details

#queue_idObject (readonly)

Returns the value of attribute queue_id.



237
238
239
# File 'lib/roundhouse/api.rb', line 237

def queue_id
  @queue_id
end

Class Method Details

.allObject



231
232
233
234
235
# File 'lib/roundhouse/api.rb', line 231

def self.all
  Roundhouse.redis do |c|
    c.smembers(Roundhouse::Monitor::BUCKETS).map { |bucket_num| c.hkeys("#{Roundhouse::Monitor::STATUS}:#{bucket_num}") }
  end.flatten.sort.map {|q| Roundhouse::Queue.new(q) }
end

Instance Method Details

#bucketObject



253
254
255
# File 'lib/roundhouse/api.rb', line 253

def bucket
  Roundhouse::Monitor.status_bucket(queue_id)
end

#clearObject Also known as: 💣



299
300
301
302
303
304
305
306
# File 'lib/roundhouse/api.rb', line 299

def clear
  Roundhouse.redis do |conn|
    conn.multi do
      conn.del(@rname)
      conn.hdel(bucket, queue_id)
    end
  end
end

#eachObject



274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
# File 'lib/roundhouse/api.rb', line 274

def each
  initial_size = size
  deleted_size = 0
  page = 0
  page_size = 50

  loop do
    range_start = page * page_size - deleted_size
    range_end   = page * page_size - deleted_size + (page_size - 1)
    entries = Roundhouse.redis do |conn|
      conn.lrange @rname, range_start, range_end
    end
    break if entries.empty?
    page += 1
    entries.each do |entry|
      yield Job.new(entry, @name)
    end
    deleted_size = initial_size - size
  end
end

#find_job(jid) ⇒ Object



295
296
297
# File 'lib/roundhouse/api.rb', line 295

def find_job(jid)
  detect { |j| j.jid == jid }
end

#latencyObject



266
267
268
269
270
271
272
# File 'lib/roundhouse/api.rb', line 266

def latency
  entry = Roundhouse.redis do |conn|
    conn.lrange(@rname, -1, -1)
  end.first
  return 0 unless entry
  Time.now.to_f - Roundhouse.load_json(entry)['enqueued_at']
end

#paused?Boolean

Roundhouse Pro overrides this

Returns:

  • (Boolean)


262
263
264
# File 'lib/roundhouse/api.rb', line 262

def paused?
  false
end

#sizeObject



257
258
259
# File 'lib/roundhouse/api.rb', line 257

def size
  Roundhouse.redis { |con| con.llen(@rname) }
end

#statusObject



244
245
246
247
248
249
250
251
# File 'lib/roundhouse/api.rb', line 244

def status
  case Roundhouse.redis { |conn| Roundhouse::Monitor.queue_status(conn, queue_id) }
  when Roundhouse::Monitor::ACTIVE    then :active
  when Roundhouse::Monitor::EMPTY     then :empty
  when Roundhouse::Monitor::SUSPENDED then :suspended
  else :unknown
  end
end