Class: Reqless::Queue
- Inherits:
-
Object
- Object
- Reqless::Queue
- Defined in:
- lib/reqless/queue.rb
Overview
A class for interacting with a specific queue. Not meant to be instantiated directly, it’s accessed with Client#queues
Instance Attribute Summary collapse
-
#client ⇒ Object
readonly
Returns the value of attribute client.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
Instance Method Summary collapse
- #==(other) ⇒ Object (also: #eql?)
- #counts ⇒ Object
- #hash ⇒ Object
- #heartbeat ⇒ Object
- #heartbeat=(value) ⇒ Object
-
#initialize(name, client) ⇒ Queue
constructor
A new instance of Queue.
- #jobs ⇒ Object
-
#length ⇒ Object
How many items in the queue?.
- #pause(opts = {}) ⇒ Object
- #paused? ⇒ Boolean
-
#peek(offset_or_count = nil, count = nil) ⇒ Object
Peek at a work item.
-
#pop(count = nil) ⇒ Object
Pop a work item off the queue.
-
#put(klass, data, opts = {}) ⇒ Object
Put the described job in this queue Options include: => priority (int) => tags (array of strings) => delay (int) => throttles (array of strings).
-
#recur(klass, data, interval, opts = {}) ⇒ Object
Make a recurring job in this queue Options include: => priority (int) => tags (array of strings) => retries (int) => offset (int).
- #stats(date = nil) ⇒ Object
- #throttle ⇒ Object
- #to_s ⇒ Object (also: #inspect)
- #unpause ⇒ Object
-
#worker_name ⇒ Object
Our worker name is the same as our client’s.
Constructor Details
#initialize(name, client) ⇒ Queue
Returns a new instance of Queue.
46 47 48 49 |
# File 'lib/reqless/queue.rb', line 46 def initialize(name, client) @client = client @name = name end |
Instance Attribute Details
#client ⇒ Object (readonly)
Returns the value of attribute client.
44 45 46 |
# File 'lib/reqless/queue.rb', line 44 def client @client end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
44 45 46 |
# File 'lib/reqless/queue.rb', line 44 def name @name end |
Instance Method Details
#==(other) ⇒ Object Also known as: eql?
163 164 165 166 167 |
# File 'lib/reqless/queue.rb', line 163 def ==(other) self.class == other.class && client == other.client && name.to_s == other.name.to_s end |
#counts ⇒ Object
60 61 62 |
# File 'lib/reqless/queue.rb', line 60 def counts JSON.parse(@client.call('queue.counts', @name)) end |
#hash ⇒ Object
170 171 172 |
# File 'lib/reqless/queue.rb', line 170 def hash self.class.hash ^ client.hash ^ name.to_s.hash end |
#heartbeat ⇒ Object
64 65 66 |
# File 'lib/reqless/queue.rb', line 64 def heartbeat get_config :heartbeat end |
#heartbeat=(value) ⇒ Object
68 69 70 |
# File 'lib/reqless/queue.rb', line 68 def heartbeat=(value) set_config :heartbeat, value end |
#jobs ⇒ Object
56 57 58 |
# File 'lib/reqless/queue.rb', line 56 def jobs @jobs ||= QueueJobs.new(@name, @client) end |
#length ⇒ Object
How many items in the queue?
150 151 152 153 154 155 156 |
# File 'lib/reqless/queue.rb', line 150 def length (@client.redis.multi do |pipeline| pipeline.zcard("ql:q:#{@name}-locks") pipeline.zcard("ql:q:#{@name}-work") pipeline.zcard("ql:q:#{@name}-scheduled") end).inject(0, :+) end |
#pause(opts = {}) ⇒ Object
80 81 82 83 |
# File 'lib/reqless/queue.rb', line 80 def pause(opts = {}) @client.call('queue.pause', name) @client.call('job.timeout', jobs.running(0, -1)) unless opts[:stopjobs].nil? end |
#paused? ⇒ Boolean
76 77 78 |
# File 'lib/reqless/queue.rb', line 76 def paused? counts['paused'] end |
#peek(offset_or_count = nil, count = nil) ⇒ Object
Peek at a work item
136 137 138 139 140 141 142 143 |
# File 'lib/reqless/queue.rb', line 136 def peek(offset_or_count = nil, count = nil) actual_offset = offset_or_count && count ? offset_or_count : 0 actual_count = offset_or_count && count ? count : (offset_or_count || 1) return_single_job = offset_or_count.nil? && count.nil? jids = JSON.parse(@client.call('queue.peek', @name, actual_offset, actual_count)) jobs = jids.map { |j| Job.new(@client, j) } return_single_job ? jobs[0] : jobs end |
#pop(count = nil) ⇒ Object
Pop a work item off the queue
129 130 131 132 133 |
# File 'lib/reqless/queue.rb', line 129 def pop(count = nil) jids = JSON.parse(@client.call('queue.pop', @name, worker_name, (count || 1))) jobs = jids.map { |j| Job.new(@client, j) } count.nil? ? jobs[0] : jobs end |
#put(klass, data, opts = {}) ⇒ Object
Put the described job in this queue Options include:
> priority (int)
> tags (array of strings)
> delay (int)
> throttles (array of strings)
95 96 97 98 99 100 101 102 103 104 |
# File 'lib/reqless/queue.rb', line 95 def put(klass, data, opts = {}) opts = (klass, data, opts) @client.call( 'queue.put', worker_name, @name, (opts[:jid] || Reqless.generate_jid), klass.is_a?(String) ? klass : klass.name, *Job.build_opts_array(opts.merge(:data => data)), ) end |
#recur(klass, data, interval, opts = {}) ⇒ Object
Make a recurring job in this queue Options include:
> priority (int)
> tags (array of strings)
> retries (int)
> offset (int)
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/reqless/queue.rb', line 112 def recur(klass, data, interval, opts = {}) opts = (klass, data, opts) @client.call( 'queue.recurAtInterval', @name, (opts[:jid] || Reqless.generate_jid), klass.is_a?(String) ? klass : klass.name, JSON.generate(data), interval, opts.fetch(:offset, 0), 'priority', opts.fetch(:priority, 0), 'tags', JSON.generate(opts.fetch(:tags, [])), 'retries', opts.fetch(:retries, 5), 'backlog', opts.fetch(:backlog, 0) ) end |
#stats(date = nil) ⇒ Object
145 146 147 |
# File 'lib/reqless/queue.rb', line 145 def stats(date = nil) JSON.parse(@client.call('queue.stats', @name, (date || Time.now.to_f))) end |
#throttle ⇒ Object
72 73 74 |
# File 'lib/reqless/queue.rb', line 72 def throttle @throttle ||= Reqless::Throttle.new("ql:q:#{name}", client) end |
#to_s ⇒ Object Also known as: inspect
158 159 160 |
# File 'lib/reqless/queue.rb', line 158 def to_s "#<Reqless::Queue #{@name}>" end |
#unpause ⇒ Object
85 86 87 |
# File 'lib/reqless/queue.rb', line 85 def unpause @client.call('queue.unpause', name) end |
#worker_name ⇒ Object
Our worker name is the same as our client’s
52 53 54 |
# File 'lib/reqless/queue.rb', line 52 def worker_name @client.worker_name end |