Class: Reqless::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/reqless/queue.rb

Overview

A class for interacting with a specific queue. Not meant to be instantiated directly, it’s accessed with Client#queues

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, client) ⇒ Queue

Returns a new instance of Queue.



46
47
48
49
# File 'lib/reqless/queue.rb', line 46

def initialize(name, client)
  @client = client
  @name   = name
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



44
45
46
# File 'lib/reqless/queue.rb', line 44

def client
  @client
end

#nameObject (readonly)

Returns the value of attribute name.



44
45
46
# File 'lib/reqless/queue.rb', line 44

def name
  @name
end

Instance Method Details

#==(other) ⇒ Object Also known as: eql?



163
164
165
166
167
# File 'lib/reqless/queue.rb', line 163

def ==(other)
  self.class == other.class &&
  client == other.client &&
  name.to_s == other.name.to_s
end

#countsObject



60
61
62
# File 'lib/reqless/queue.rb', line 60

def counts
  JSON.parse(@client.call('queue.counts', @name))
end

#hashObject



170
171
172
# File 'lib/reqless/queue.rb', line 170

def hash
  self.class.hash ^ client.hash ^ name.to_s.hash
end

#heartbeatObject



64
65
66
# File 'lib/reqless/queue.rb', line 64

def heartbeat
  get_config :heartbeat
end

#heartbeat=(value) ⇒ Object



68
69
70
# File 'lib/reqless/queue.rb', line 68

def heartbeat=(value)
  set_config :heartbeat, value
end

#jobsObject



56
57
58
# File 'lib/reqless/queue.rb', line 56

def jobs
  @jobs ||= QueueJobs.new(@name, @client)
end

#lengthObject

How many items in the queue?



150
151
152
153
154
155
156
# File 'lib/reqless/queue.rb', line 150

def length
  (@client.redis.multi do |pipeline|
    pipeline.zcard("ql:q:#{@name}-locks")
    pipeline.zcard("ql:q:#{@name}-work")
    pipeline.zcard("ql:q:#{@name}-scheduled")
  end).inject(0, :+)
end

#pause(opts = {}) ⇒ Object



80
81
82
83
# File 'lib/reqless/queue.rb', line 80

def pause(opts = {})
  @client.call('queue.pause', name)
  @client.call('job.timeout', jobs.running(0, -1)) unless opts[:stopjobs].nil?
end

#paused?Boolean

Returns:

  • (Boolean)


76
77
78
# File 'lib/reqless/queue.rb', line 76

def paused?
  counts['paused']
end

#peek(offset_or_count = nil, count = nil) ⇒ Object

Peek at a work item



136
137
138
139
140
141
142
143
# File 'lib/reqless/queue.rb', line 136

def peek(offset_or_count = nil, count = nil)
  actual_offset = offset_or_count && count ? offset_or_count : 0
  actual_count = offset_or_count && count ? count : (offset_or_count || 1)
  return_single_job = offset_or_count.nil? && count.nil?
  jids = JSON.parse(@client.call('queue.peek', @name, actual_offset, actual_count))
  jobs = jids.map { |j| Job.new(@client, j) }
  return_single_job ? jobs[0] : jobs
end

#pop(count = nil) ⇒ Object

Pop a work item off the queue



129
130
131
132
133
# File 'lib/reqless/queue.rb', line 129

def pop(count = nil)
  jids = JSON.parse(@client.call('queue.pop', @name, worker_name, (count || 1)))
  jobs = jids.map { |j| Job.new(@client, j) }
  count.nil? ? jobs[0] : jobs
end

#put(klass, data, opts = {}) ⇒ Object

Put the described job in this queue Options include:

> priority (int)

> tags (array of strings)

> delay (int)

> throttles (array of strings)



95
96
97
98
99
100
101
102
103
104
# File 'lib/reqless/queue.rb', line 95

def put(klass, data, opts = {})
  opts = job_options(klass, data, opts)
  @client.call(
    'queue.put',
    worker_name, @name,
    (opts[:jid] || Reqless.generate_jid),
    klass.is_a?(String) ? klass : klass.name,
    *Job.build_opts_array(opts.merge(:data => data)),
  )
end

#recur(klass, data, interval, opts = {}) ⇒ Object

Make a recurring job in this queue Options include:

> priority (int)

> tags (array of strings)

> retries (int)

> offset (int)



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/reqless/queue.rb', line 112

def recur(klass, data, interval, opts = {})
  opts = job_options(klass, data, opts)
  @client.call(
    'queue.recurAtInterval',
    @name,
    (opts[:jid] || Reqless.generate_jid),
    klass.is_a?(String) ? klass : klass.name,
    JSON.generate(data),
    interval, opts.fetch(:offset, 0),
    'priority', opts.fetch(:priority, 0),
    'tags', JSON.generate(opts.fetch(:tags, [])),
    'retries', opts.fetch(:retries, 5),
    'backlog', opts.fetch(:backlog, 0)
  )
end

#stats(date = nil) ⇒ Object



145
146
147
# File 'lib/reqless/queue.rb', line 145

def stats(date = nil)
  JSON.parse(@client.call('queue.stats', @name, (date || Time.now.to_f)))
end

#throttleObject



72
73
74
# File 'lib/reqless/queue.rb', line 72

def throttle
  @throttle ||= Reqless::Throttle.new("ql:q:#{name}", client)
end

#to_sObject Also known as: inspect



158
159
160
# File 'lib/reqless/queue.rb', line 158

def to_s
  "#<Reqless::Queue #{@name}>"
end

#unpauseObject



85
86
87
# File 'lib/reqless/queue.rb', line 85

def unpause
  @client.call('queue.unpause', name)
end

#worker_nameObject

Our worker name is the same as our client’s



52
53
54
# File 'lib/reqless/queue.rb', line 52

def worker_name
  @client.worker_name
end