Class: Qless::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/qless/queue.rb

Overview

A class for interacting with a specific queue. Not meant to be instantiated directly, it’s accessed with Client#queues

Constant Summary collapse

QueueNotEmptyError =
Class.new(StandardError)

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, client) ⇒ Queue

Returns a new instance of Queue.



42
43
44
45
# File 'lib/qless/queue.rb', line 42

def initialize(name, client)
  @client = client
  @name   = name
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



40
41
42
# File 'lib/qless/queue.rb', line 40

def client
  @client
end

#nameObject (readonly)

Returns the value of attribute name.



40
41
42
# File 'lib/qless/queue.rb', line 40

def name
  @name
end

Instance Method Details

#==(other) ⇒ Object Also known as: eql?



174
175
176
177
178
# File 'lib/qless/queue.rb', line 174

def ==(other)
  self.class == other.class &&
  client == other.client &&
  name.to_s == other.name.to_s
end

#countsObject



56
57
58
# File 'lib/qless/queue.rb', line 56

def counts
  JSON.parse(@client.call('queues', @name))
end

#forgetObject



92
93
94
95
96
97
98
99
# File 'lib/qless/queue.rb', line 92

def forget
  job_count = length
  if job_count.zero?
    @client.call('queue.forget', name)
  else
    raise QueueNotEmptyError, "The queue is not empty. It has #{job_count} jobs."
  end
end

#hashObject



181
182
183
# File 'lib/qless/queue.rb', line 181

def hash
  self.class.hash ^ client.hash ^ name.to_s.hash
end

#heartbeatObject



60
61
62
# File 'lib/qless/queue.rb', line 60

def heartbeat
  get_config :heartbeat
end

#heartbeat=(value) ⇒ Object



64
65
66
# File 'lib/qless/queue.rb', line 64

def heartbeat=(value)
  set_config :heartbeat, value
end

#jobsObject



52
53
54
# File 'lib/qless/queue.rb', line 52

def jobs
  @jobs ||= QueueJobs.new(@name, @client)
end

#lengthObject

How many items in the queue?



161
162
163
164
165
166
167
# File 'lib/qless/queue.rb', line 161

def length
  (@client.redis.multi do
    %w[ locks work scheduled depends ].each do |suffix|
      @client.redis.zcard("ql:q:#{@name}-#{suffix}")
    end
  end).inject(0, :+)
end

#max_concurrencyObject



68
69
70
71
# File 'lib/qless/queue.rb', line 68

def max_concurrency
  value = get_config('max-concurrency')
  value && Integer(value)
end

#max_concurrency=(value) ⇒ Object



73
74
75
# File 'lib/qless/queue.rb', line 73

def max_concurrency=(value)
  set_config 'max-concurrency', value
end

#pause(opts = {}) ⇒ Object



81
82
83
84
# File 'lib/qless/queue.rb', line 81

def pause(opts = {})
  @client.call('pause', name)
  @client.call('timeout', jobs.running(0, -1)) unless opts[:stopjobs].nil?
end

#paused?Boolean

Returns:

  • (Boolean)


77
78
79
# File 'lib/qless/queue.rb', line 77

def paused?
  counts['paused']
end

#peek(count = nil) ⇒ Object

Peek at a work item



150
151
152
153
154
# File 'lib/qless/queue.rb', line 150

def peek(count = nil)
  jids = JSON.parse(@client.call('peek', @name, (count || 1)))
  jobs = jids.map { |j| Job.new(@client, j) }
  count.nil? ? jobs[0] : jobs
end

#pop(count = nil) ⇒ Object

Pop a work item off the queue



143
144
145
146
147
# File 'lib/qless/queue.rb', line 143

def pop(count = nil)
  jids = JSON.parse(@client.call('pop', @name, worker_name, (count || 1)))
  jobs = jids.map { |j| Job.new(@client, j) }
  count.nil? ? jobs[0] : jobs
end

#put(klass, data, opts = {}) ⇒ Object

Put the described job in this queue Options include:

> priority (int)

> tags (array of strings)

> delay (int)



106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/qless/queue.rb', line 106

def put(klass, data, opts = {})
  opts = job_options(klass, data, opts)
  @client.call('put', worker_name, @name,
               (opts[:jid] || Qless.generate_jid),
               klass.is_a?(String) ? klass : klass.name,
               JSON.generate(data),
               opts.fetch(:delay, 0),
               'priority', opts.fetch(:priority, 0),
               'tags', JSON.generate(opts.fetch(:tags, [])),
               'retries', opts.fetch(:retries, 5),
               'depends', JSON.generate(opts.fetch(:depends, []))
  )
end

#recur(klass, data, interval, opts = {}) ⇒ Object

Make a recurring job in this queue Options include:

> priority (int)

> tags (array of strings)

> retries (int)

> offset (int)



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/qless/queue.rb', line 126

def recur(klass, data, interval, opts = {})
  opts = job_options(klass, data, opts)
  @client.call(
    'recur',
    @name,
    (opts[:jid] || Qless.generate_jid),
    klass.is_a?(String) ? klass : klass.name,
    JSON.generate(data),
    'interval', interval, opts.fetch(:offset, 0),
    'priority', opts.fetch(:priority, 0),
    'tags', JSON.generate(opts.fetch(:tags, [])),
    'retries', opts.fetch(:retries, 5),
    'backlog', opts.fetch(:backlog, 0)
  )
end

#stats(date = nil) ⇒ Object



156
157
158
# File 'lib/qless/queue.rb', line 156

def stats(date = nil)
  JSON.parse(@client.call('stats', @name, (date || Time.now.to_f)))
end

#to_sObject Also known as: inspect



169
170
171
# File 'lib/qless/queue.rb', line 169

def to_s
  "#<Qless::Queue #{@name}>"
end

#unpauseObject



86
87
88
# File 'lib/qless/queue.rb', line 86

def unpause
  @client.call('unpause', name)
end

#worker_nameObject

Our worker name is the same as our client’s



48
49
50
# File 'lib/qless/queue.rb', line 48

def worker_name
  @client.worker_name
end