Class: Qless::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/qless/queue.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, client) ⇒ Queue

Returns a new instance of Queue.



37
38
39
40
41
# File 'lib/qless/queue.rb', line 37

def initialize(name, client)
  @client = client
  @name   = name
  self.worker_name = Qless.worker_name
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



34
35
36
# File 'lib/qless/queue.rb', line 34

def client
  @client
end

#nameObject (readonly)

Returns the value of attribute name.



34
35
36
# File 'lib/qless/queue.rb', line 34

def name
  @name
end

#worker_nameObject

Returns the value of attribute worker_name.



35
36
37
# File 'lib/qless/queue.rb', line 35

def worker_name
  @worker_name
end

Instance Method Details

#countsObject



47
48
49
# File 'lib/qless/queue.rb', line 47

def counts
  JSON.parse(@client._queues.call([], [Time.now.to_i, @name]))
end

#heartbeatObject



51
52
53
# File 'lib/qless/queue.rb', line 51

def heartbeat
  get_config :heartbeat
end

#heartbeat=(value) ⇒ Object



55
56
57
# File 'lib/qless/queue.rb', line 55

def heartbeat=(value)
  set_config :heartbeat, value
end

#jobsObject



43
44
45
# File 'lib/qless/queue.rb', line 43

def jobs
  @jobs ||= QueueJobs.new(@name, @client)
end

#lengthObject

How many items in the queue?



137
138
139
140
141
142
143
# File 'lib/qless/queue.rb', line 137

def length
  (@client.redis.multi do
    @client.redis.zcard("ql:q:#{@name}-locks")
    @client.redis.zcard("ql:q:#{@name}-work")
    @client.redis.zcard("ql:q:#{@name}-scheduled")
  end).inject(0, :+)
end

#max_concurrencyObject



59
60
61
62
# File 'lib/qless/queue.rb', line 59

def max_concurrency
  value = get_config(:"max-concurrency")
  value && Integer(value)
end

#max_concurrency=(value) ⇒ Object



64
65
66
# File 'lib/qless/queue.rb', line 64

def max_concurrency=(value)
  set_config :"max-concurrency", value
end

#pauseObject



68
69
70
# File 'lib/qless/queue.rb', line 68

def pause
  @client._pause.call([], [name])
end

#peek(count = nil) ⇒ Object

Peek at a work item



127
128
129
130
# File 'lib/qless/queue.rb', line 127

def peek(count=nil)
  results = @client._peek.call([@name], [(count || 1), Time.now.to_f]).map { |j| Job.new(@client, JSON.parse(j)) }
  count.nil? ? results[0] : results
end

#pop(count = nil) ⇒ Object

Pop a work item off the queue



121
122
123
124
# File 'lib/qless/queue.rb', line 121

def pop(count=nil)
  results = @client._pop.call([@name], [worker_name, (count || 1), Time.now.to_f]).map { |j| Job.new(@client, JSON.parse(j)) }
  count.nil? ? results[0] : results
end

#put(klass, data, opts = {}) ⇒ Object

Put the described job in this queue Options include:

> priority (int)

> tags (array of strings)

> delay (int)



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/qless/queue.rb', line 81

def put(klass, data, opts={})
  opts = job_options(klass, data, opts)

  @client._put.call([@name], [
    (opts[:jid] or Qless.generate_jid),
    klass.name,
    JSON.generate(data),
    Time.now.to_f,
    opts.fetch(:delay, 0),
    'priority', opts.fetch(:priority, 0),
    'tags', JSON.generate(opts.fetch(:tags, [])),
    'retries', opts.fetch(:retries, 5),
    'depends', JSON.generate(opts.fetch(:depends, []))
  ])
end

#recur(klass, data, interval, opts = {}) ⇒ Object

Make a recurring job in this queue Options include:

> priority (int)

> tags (array of strings)

> retries (int)

> offset (int)



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/qless/queue.rb', line 103

def recur(klass, data, interval, opts={})
  opts = job_options(klass, data, opts)

  @client._recur.call([], [
    'on',
    @name,
    (opts[:jid] or Qless.generate_jid),
    klass.to_s,
    JSON.generate(data),
    Time.now.to_f,
    'interval', interval, opts.fetch(:offset, 0),
    'priority', opts.fetch(:priority, 0),
    'tags', JSON.generate(opts.fetch(:tags, [])),
    'retries', opts.fetch(:retries, 5)
  ])
end

#stats(date = nil) ⇒ Object



132
133
134
# File 'lib/qless/queue.rb', line 132

def stats(date=nil)
  JSON.parse(@client._stats.call([], [@name, (date || Time.now.to_f)]))
end

#to_sObject Also known as: inspect



145
146
147
# File 'lib/qless/queue.rb', line 145

def to_s
  "#<Qless::Queue #{@name}>"
end

#unpauseObject



72
73
74
# File 'lib/qless/queue.rb', line 72

def unpause
  @client._unpause.call([], [name])
end