Class: Roundhouse::Queue
- Inherits:
-
Object
- Object
- Roundhouse::Queue
- Includes:
- Enumerable
- Defined in:
- lib/roundhouse/api.rb
Overview
Encapsulates a queue within Roundhouse. Allows enumeration of all jobs within the queue and deletion of jobs.
queue = Roundhouse::Queue.new("mailer")
queue.each do |job|
job.klass # => 'MyWorker'
job.args # => [1, 2, 3]
job.delete if job.jid == 'abcdef1234567890'
end
Instance Attribute Summary collapse
-
#queue_id ⇒ Object
readonly
Returns the value of attribute queue_id.
Class Method Summary collapse
Instance Method Summary collapse
- #bucket ⇒ Object
- #clear ⇒ Object (also: #💣)
- #each ⇒ Object
- #find_job(jid) ⇒ Object
-
#initialize(queue_id) ⇒ Queue
constructor
A new instance of Queue.
- #latency ⇒ Object
-
#paused? ⇒ Boolean
Roundhouse Pro overrides this.
- #size ⇒ Object
- #status ⇒ Object
Constructor Details
#initialize(queue_id) ⇒ Queue
Returns a new instance of Queue.
239 240 241 242 |
# File 'lib/roundhouse/api.rb', line 239 def initialize(queue_id) @queue_id = queue_id.to_i @rname = "queue:#{queue_id}" end |
Instance Attribute Details
#queue_id ⇒ Object (readonly)
Returns the value of attribute queue_id.
237 238 239 |
# File 'lib/roundhouse/api.rb', line 237 def queue_id @queue_id end |
Class Method Details
.all ⇒ Object
231 232 233 234 235 |
# File 'lib/roundhouse/api.rb', line 231 def self.all Roundhouse.redis do |c| c.smembers(Roundhouse::Monitor::BUCKETS).map { |bucket_num| c.hkeys("#{Roundhouse::Monitor::STATUS}:#{bucket_num}") } end.flatten.sort.map {|q| Roundhouse::Queue.new(q) } end |
Instance Method Details
#bucket ⇒ Object
253 254 255 |
# File 'lib/roundhouse/api.rb', line 253 def bucket Roundhouse::Monitor.status_bucket(queue_id) end |
#clear ⇒ Object Also known as: 💣
299 300 301 302 303 304 305 306 |
# File 'lib/roundhouse/api.rb', line 299 def clear Roundhouse.redis do |conn| conn.multi do conn.del(@rname) conn.hdel(bucket, queue_id) end end end |
#each ⇒ Object
274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 |
# File 'lib/roundhouse/api.rb', line 274 def each initial_size = size deleted_size = 0 page = 0 page_size = 50 loop do range_start = page * page_size - deleted_size range_end = page * page_size - deleted_size + (page_size - 1) entries = Roundhouse.redis do |conn| conn.lrange @rname, range_start, range_end end break if entries.empty? page += 1 entries.each do |entry| yield Job.new(entry, @name) end deleted_size = initial_size - size end end |
#find_job(jid) ⇒ Object
295 296 297 |
# File 'lib/roundhouse/api.rb', line 295 def find_job(jid) detect { |j| j.jid == jid } end |
#latency ⇒ Object
266 267 268 269 270 271 272 |
# File 'lib/roundhouse/api.rb', line 266 def latency entry = Roundhouse.redis do |conn| conn.lrange(@rname, -1, -1) end.first return 0 unless entry Time.now.to_f - Roundhouse.load_json(entry)['enqueued_at'] end |
#paused? ⇒ Boolean
Roundhouse Pro overrides this
262 263 264 |
# File 'lib/roundhouse/api.rb', line 262 def paused? false end |
#size ⇒ Object
257 258 259 |
# File 'lib/roundhouse/api.rb', line 257 def size Roundhouse.redis { |con| con.llen(@rname) } end |
#status ⇒ Object
244 245 246 247 248 249 250 251 |
# File 'lib/roundhouse/api.rb', line 244 def status case Roundhouse.redis { |conn| Roundhouse::Monitor.queue_status(conn, queue_id) } when Roundhouse::Monitor::ACTIVE then :active when Roundhouse::Monitor::EMPTY then :empty when Roundhouse::Monitor::SUSPENDED then :suspended else :unknown end end |