Class: Qless::Queue
- Inherits:
-
Object
- Object
- Qless::Queue
- Defined in:
- lib/qless/queue.rb
Instance Attribute Summary collapse
-
#client ⇒ Object
readonly
Returns the value of attribute client.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#worker_name ⇒ Object
Returns the value of attribute worker_name.
Instance Method Summary collapse
- #counts ⇒ Object
- #heartbeat ⇒ Object
- #heartbeat=(value) ⇒ Object
-
#initialize(name, client) ⇒ Queue
constructor
A new instance of Queue.
- #jobs ⇒ Object
-
#length ⇒ Object
How many items in the queue?.
- #max_concurrency ⇒ Object
- #max_concurrency=(value) ⇒ Object
- #pause ⇒ Object
-
#peek(count = nil) ⇒ Object
Peek at a work item.
-
#pop(count = nil) ⇒ Object
Pop a work item off the queue.
-
#put(klass, data, opts = {}) ⇒ Object
Put the described job in this queue Options include: => priority (int) => tags (array of strings) => delay (int).
-
#recur(klass, data, interval, opts = {}) ⇒ Object
Make a recurring job in this queue Options include: => priority (int) => tags (array of strings) => retries (int) => offset (int).
- #stats(date = nil) ⇒ Object
- #to_s ⇒ Object (also: #inspect)
- #unpause ⇒ Object
Constructor Details
#initialize(name, client) ⇒ Queue
Returns a new instance of Queue.
37 38 39 40 41 |
# File 'lib/qless/queue.rb', line 37 def initialize(name, client) @client = client @name = name self.worker_name = Qless.worker_name end |
Instance Attribute Details
#client ⇒ Object (readonly)
Returns the value of attribute client.
34 35 36 |
# File 'lib/qless/queue.rb', line 34 def client @client end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
34 35 36 |
# File 'lib/qless/queue.rb', line 34 def name @name end |
#worker_name ⇒ Object
Returns the value of attribute worker_name.
35 36 37 |
# File 'lib/qless/queue.rb', line 35 def worker_name @worker_name end |
Instance Method Details
#counts ⇒ Object
47 48 49 |
# File 'lib/qless/queue.rb', line 47 def counts JSON.parse(@client._queues.call([], [Time.now.to_i, @name])) end |
#heartbeat ⇒ Object
51 52 53 |
# File 'lib/qless/queue.rb', line 51 def heartbeat get_config :heartbeat end |
#heartbeat=(value) ⇒ Object
55 56 57 |
# File 'lib/qless/queue.rb', line 55 def heartbeat=(value) set_config :heartbeat, value end |
#jobs ⇒ Object
43 44 45 |
# File 'lib/qless/queue.rb', line 43 def jobs @jobs ||= QueueJobs.new(@name, @client) end |
#length ⇒ Object
How many items in the queue?
137 138 139 140 141 142 143 |
# File 'lib/qless/queue.rb', line 137 def length (@client.redis.multi do @client.redis.zcard("ql:q:#{@name}-locks") @client.redis.zcard("ql:q:#{@name}-work") @client.redis.zcard("ql:q:#{@name}-scheduled") end).inject(0, :+) end |
#max_concurrency ⇒ Object
59 60 61 62 |
# File 'lib/qless/queue.rb', line 59 def max_concurrency value = get_config(:"max-concurrency") value && Integer(value) end |
#max_concurrency=(value) ⇒ Object
64 65 66 |
# File 'lib/qless/queue.rb', line 64 def max_concurrency=(value) set_config :"max-concurrency", value end |
#pause ⇒ Object
68 69 70 |
# File 'lib/qless/queue.rb', line 68 def pause @client._pause.call([], [name]) end |
#peek(count = nil) ⇒ Object
Peek at a work item
127 128 129 130 |
# File 'lib/qless/queue.rb', line 127 def peek(count=nil) results = @client._peek.call([@name], [(count || 1), Time.now.to_f]).map { |j| Job.new(@client, JSON.parse(j)) } count.nil? ? results[0] : results end |
#pop(count = nil) ⇒ Object
Pop a work item off the queue
121 122 123 124 |
# File 'lib/qless/queue.rb', line 121 def pop(count=nil) results = @client._pop.call([@name], [worker_name, (count || 1), Time.now.to_f]).map { |j| Job.new(@client, JSON.parse(j)) } count.nil? ? results[0] : results end |
#put(klass, data, opts = {}) ⇒ Object
Put the described job in this queue Options include:
> priority (int)
> tags (array of strings)
> delay (int)
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/qless/queue.rb', line 81 def put(klass, data, opts={}) opts = (klass, data, opts) @client._put.call([@name], [ (opts[:jid] or Qless.generate_jid), klass.name, JSON.generate(data), Time.now.to_f, opts.fetch(:delay, 0), 'priority', opts.fetch(:priority, 0), 'tags', JSON.generate(opts.fetch(:tags, [])), 'retries', opts.fetch(:retries, 5), 'depends', JSON.generate(opts.fetch(:depends, [])) ]) end |
#recur(klass, data, interval, opts = {}) ⇒ Object
Make a recurring job in this queue Options include:
> priority (int)
> tags (array of strings)
> retries (int)
> offset (int)
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/qless/queue.rb', line 103 def recur(klass, data, interval, opts={}) opts = (klass, data, opts) @client._recur.call([], [ 'on', @name, (opts[:jid] or Qless.generate_jid), klass.to_s, JSON.generate(data), Time.now.to_f, 'interval', interval, opts.fetch(:offset, 0), 'priority', opts.fetch(:priority, 0), 'tags', JSON.generate(opts.fetch(:tags, [])), 'retries', opts.fetch(:retries, 5) ]) end |
#stats(date = nil) ⇒ Object
132 133 134 |
# File 'lib/qless/queue.rb', line 132 def stats(date=nil) JSON.parse(@client._stats.call([], [@name, (date || Time.now.to_f)])) end |
#to_s ⇒ Object Also known as: inspect
145 146 147 |
# File 'lib/qless/queue.rb', line 145 def to_s "#<Qless::Queue #{@name}>" end |
#unpause ⇒ Object
72 73 74 |
# File 'lib/qless/queue.rb', line 72 def unpause @client._unpause.call([], [name]) end |