Class: Qless::Queue
- Inherits:
-
Object
- Object
- Qless::Queue
- Defined in:
- lib/qless/queue.rb
Overview
A class for interacting with a specific queue. Not meant to be instantiated directly, it’s accessed with Client#queues
Constant Summary collapse
- QueueNotEmptyError =
Class.new(StandardError)
Instance Attribute Summary collapse
-
#client ⇒ Object
readonly
Returns the value of attribute client.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
Instance Method Summary collapse
- #==(other) ⇒ Object (also: #eql?)
- #counts ⇒ Object
- #forget ⇒ Object
- #hash ⇒ Object
- #heartbeat ⇒ Object
- #heartbeat=(value) ⇒ Object
-
#initialize(name, client) ⇒ Queue
constructor
A new instance of Queue.
- #jobs ⇒ Object
-
#length ⇒ Object
How many items in the queue?.
- #max_concurrency ⇒ Object
- #max_concurrency=(value) ⇒ Object
- #pause(opts = {}) ⇒ Object
- #paused? ⇒ Boolean
-
#peek(count = nil) ⇒ Object
Peek at a work item.
-
#pop(count = nil) ⇒ Object
Pop a work item off the queue.
-
#put(klass, data, opts = {}) ⇒ Object
Put the described job in this queue Options include: => priority (int) => tags (array of strings) => delay (int).
-
#recur(klass, data, interval, opts = {}) ⇒ Object
Make a recurring job in this queue Options include: => priority (int) => tags (array of strings) => retries (int) => offset (int).
- #stats(date = nil) ⇒ Object
- #to_s ⇒ Object (also: #inspect)
- #unpause ⇒ Object
-
#worker_name ⇒ Object
Our worker name is the same as our client’s.
Constructor Details
#initialize(name, client) ⇒ Queue
Returns a new instance of Queue.
42 43 44 45 |
# File 'lib/qless/queue.rb', line 42 def initialize(name, client) @client = client @name = name end |
Instance Attribute Details
#client ⇒ Object (readonly)
Returns the value of attribute client.
40 41 42 |
# File 'lib/qless/queue.rb', line 40 def client @client end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
40 41 42 |
# File 'lib/qless/queue.rb', line 40 def name @name end |
Instance Method Details
#==(other) ⇒ Object Also known as: eql?
174 175 176 177 178 |
# File 'lib/qless/queue.rb', line 174 def ==(other) self.class == other.class && client == other.client && name.to_s == other.name.to_s end |
#counts ⇒ Object
56 57 58 |
# File 'lib/qless/queue.rb', line 56 def counts JSON.parse(@client.call('queues', @name)) end |
#forget ⇒ Object
92 93 94 95 96 97 98 99 |
# File 'lib/qless/queue.rb', line 92 def forget job_count = length if job_count.zero? @client.call('queue.forget', name) else raise QueueNotEmptyError, "The queue is not empty. It has #{job_count} jobs." end end |
#hash ⇒ Object
181 182 183 |
# File 'lib/qless/queue.rb', line 181 def hash self.class.hash ^ client.hash ^ name.to_s.hash end |
#heartbeat ⇒ Object
60 61 62 |
# File 'lib/qless/queue.rb', line 60 def heartbeat get_config :heartbeat end |
#heartbeat=(value) ⇒ Object
64 65 66 |
# File 'lib/qless/queue.rb', line 64 def heartbeat=(value) set_config :heartbeat, value end |
#jobs ⇒ Object
52 53 54 |
# File 'lib/qless/queue.rb', line 52 def jobs @jobs ||= QueueJobs.new(@name, @client) end |
#length ⇒ Object
How many items in the queue?
161 162 163 164 165 166 167 |
# File 'lib/qless/queue.rb', line 161 def length (@client.redis.multi do %w[ locks work scheduled depends ].each do |suffix| @client.redis.zcard("ql:q:#{@name}-#{suffix}") end end).inject(0, :+) end |
#max_concurrency ⇒ Object
68 69 70 71 |
# File 'lib/qless/queue.rb', line 68 def max_concurrency value = get_config('max-concurrency') value && Integer(value) end |
#max_concurrency=(value) ⇒ Object
73 74 75 |
# File 'lib/qless/queue.rb', line 73 def max_concurrency=(value) set_config 'max-concurrency', value end |
#pause(opts = {}) ⇒ Object
81 82 83 84 |
# File 'lib/qless/queue.rb', line 81 def pause(opts = {}) @client.call('pause', name) @client.call('timeout', jobs.running(0, -1)) unless opts[:stopjobs].nil? end |
#paused? ⇒ Boolean
77 78 79 |
# File 'lib/qless/queue.rb', line 77 def paused? counts['paused'] end |
#peek(count = nil) ⇒ Object
Peek at a work item
150 151 152 153 154 |
# File 'lib/qless/queue.rb', line 150 def peek(count = nil) jids = JSON.parse(@client.call('peek', @name, (count || 1))) jobs = jids.map { |j| Job.new(@client, j) } count.nil? ? jobs[0] : jobs end |
#pop(count = nil) ⇒ Object
Pop a work item off the queue
143 144 145 146 147 |
# File 'lib/qless/queue.rb', line 143 def pop(count = nil) jids = JSON.parse(@client.call('pop', @name, worker_name, (count || 1))) jobs = jids.map { |j| Job.new(@client, j) } count.nil? ? jobs[0] : jobs end |
#put(klass, data, opts = {}) ⇒ Object
Put the described job in this queue Options include:
> priority (int)
> tags (array of strings)
> delay (int)
106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/qless/queue.rb', line 106 def put(klass, data, opts = {}) opts = (klass, data, opts) @client.call('put', worker_name, @name, (opts[:jid] || Qless.generate_jid), klass.is_a?(String) ? klass : klass.name, JSON.generate(data), opts.fetch(:delay, 0), 'priority', opts.fetch(:priority, 0), 'tags', JSON.generate(opts.fetch(:tags, [])), 'retries', opts.fetch(:retries, 5), 'depends', JSON.generate(opts.fetch(:depends, [])) ) end |
#recur(klass, data, interval, opts = {}) ⇒ Object
Make a recurring job in this queue Options include:
> priority (int)
> tags (array of strings)
> retries (int)
> offset (int)
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/qless/queue.rb', line 126 def recur(klass, data, interval, opts = {}) opts = (klass, data, opts) @client.call( 'recur', @name, (opts[:jid] || Qless.generate_jid), klass.is_a?(String) ? klass : klass.name, JSON.generate(data), 'interval', interval, opts.fetch(:offset, 0), 'priority', opts.fetch(:priority, 0), 'tags', JSON.generate(opts.fetch(:tags, [])), 'retries', opts.fetch(:retries, 5), 'backlog', opts.fetch(:backlog, 0) ) end |
#stats(date = nil) ⇒ Object
156 157 158 |
# File 'lib/qless/queue.rb', line 156 def stats(date = nil) JSON.parse(@client.call('stats', @name, (date || Time.now.to_f))) end |
#to_s ⇒ Object Also known as: inspect
169 170 171 |
# File 'lib/qless/queue.rb', line 169 def to_s "#<Qless::Queue #{@name}>" end |
#unpause ⇒ Object
86 87 88 |
# File 'lib/qless/queue.rb', line 86 def unpause @client.call('unpause', name) end |
#worker_name ⇒ Object
Our worker name is the same as our client’s
48 49 50 |
# File 'lib/qless/queue.rb', line 48 def worker_name @client.worker_name end |