Class: Timberline::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/timberline/queue.rb

Overview

Queue is the heart and soul of Timberline, which makes sense considering that it’s a queueing library. This object represents a queue in redis (really just a list of strings) and is responsible for reading to the queue, writing from the queue, maintaining queue statistics, and managing other queue actions (like pausing and deleting).

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue_name, opts = {}) ⇒ Queue

Build a new Queue object.

Parameters:

  • queue_name (String)

    the redis queue that this object should represent

  • opts (Hash) (defaults to: {})

    the options for creating this queue

Options Hash (opts):

  • :read_timeout (Integer)

    the read_timeout for this queue. defaults to 0 (which effectively disables the timeout).

  • :hidden (boolean)

    whether this queue should be hidden from Timberline’s #all_queues list. Defaults to false.

Raises:

  • (ArgumentError)

    if queue_name is not provided



27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/timberline/queue.rb', line 27

def initialize(queue_name, opts = {})
  read_timeout = opts.fetch(:read_timeout, 0)
  hidden = opts.fetch(:hidden, false)
  if queue_name.nil?
    raise ArgumentError.new("Queue name must be provided.")
  end
  @queue_name = queue_name
  @read_timeout = read_timeout
  @redis = Timberline.redis
  unless hidden
    @redis.sadd "timberline_queue_names", queue_name
  end
end

Instance Attribute Details

#queue_nameString (readonly)

the name of this queue

Returns:

  • (String)

    the current value of queue_name



13
14
15
# File 'lib/timberline/queue.rb', line 13

def queue_name
  @queue_name
end

#read_timeoutInteger (readonly)

how long this queue should wait, in seconds, before determining that there isn’t anything to read off of the queue.

Returns:

  • (Integer)

    the current value of read_timeout



13
14
15
# File 'lib/timberline/queue.rb', line 13

def read_timeout
  @read_timeout
end

Instance Method Details

#add_error_stat(item) ⇒ Object

Stores an item from the queue that fatally errored so we can keep track of things like how many errors have occurred on this queue, etc.

Parameters:

  • item (Envelope)

    an item that fatally errored on this queue



226
227
228
# File 'lib/timberline/queue.rb', line 226

def add_error_stat(item)
  add_stat_for_key(attr("error_stats"), item)
end

#add_retry_stat(item) ⇒ Object

Stores an item from the queue that was retried so we can keep track of things like how many retries have been attempted on this queue, etc.

Parameters:

  • item (Envelope)

    an item that fatally errored on this queue



217
218
219
# File 'lib/timberline/queue.rb', line 217

def add_retry_stat(item)
  add_stat_for_key(attr("retry_stats"), item)
end

#add_success_stat(item) ⇒ Object

Stores a successfully processed queue item as a statistic so we can keep track of things like average execution time, number of successes, etc.

Parameters:

  • item (Envelope)

    an item that was processed successfully for this queue



236
237
238
239
240
# File 'lib/timberline/queue.rb', line 236

def add_success_stat(item)
  add_stat_for_key(attr("success_stats"), item)
rescue Exception => e
  $stderr.puts "Success Stat Error: #{e.inspect}, Item: #{item.inspect}"
end

#attr(key) ⇒ String

Given a key, create a string namespaced to this queue name. This method is used to keep redis keys tidy.

Returns:

  • (String)


124
125
126
# File 'lib/timberline/queue.rb', line 124

def attr(key)
  "#{@queue_name}:#{key}"
end

#average_execution_timeFloat

Given all of the successful jobs that were executed in the last

stat_timeout

minutes, determine how long on average those jobs

took to execute.

Returns:

  • (Float)

    the average execution time for successful jobs in the last

    stat_timeout

    minutes.



162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/timberline/queue.rb', line 162

def average_execution_time
  successes = Timberline.redis.xmembers(attr("success_stats")).map { |item| Envelope.from_json(item)}
  times = successes.map do |item|
    if item.finished_processing_at
      item.finished_processing_at.to_f - item.started_processing_at.to_f
    elsif item.fatal_error_at
      item.fatal_error_at.to_f - item.started_processing_at.to_f
    else
      nil
    end
  end
  times.reject! { |t| t.nil? }
  if times.size == 0
    0
  else
    times.inject(0, :+) / times.size.to_f
  end
end

#deleteObject

Delete this queue, removing it from redis and all other references to it from Timberline.

Returns:

  • as Redis#srem



46
47
48
49
50
51
52
# File 'lib/timberline/queue.rb', line 46

def delete
  @redis.del @queue_name
  @redis.keys("#{@queue_name}:*").each do |key|
    @redis.del key
  end
  @redis.srem "timberline_queue_names", @queue_name
end

#error_item(item) ⇒ Object

Given an item that errored out in processing, add any appropriate metadata about the error, track it as a statistic, and push it onto the error queue.

Parameters:

  • item (Envelope)

    an item that has fatally errored



206
207
208
209
210
# File 'lib/timberline/queue.rb', line 206

def error_item(item)
  item.fatal_error_at = Time.now.to_f
  add_error_stat(item)
  self.error_queue.push(item)
end

#error_queueTimberline::Queue

Returns a (hidden) Queue object where this queue’s errors are pushed.

Returns:

  • (Timberline::Queue)

    a (hidden) Queue object where this queue’s errors are pushed.



245
246
247
# File 'lib/timberline/queue.rb', line 245

def error_queue
  @error_queue ||= Timberline.queue(attr("errors"), hidden: true)
end

#lengthInteger

The current number of items on the queue waiting to be processed.

Returns:

  • (Integer)


58
59
60
# File 'lib/timberline/queue.rb', line 58

def length
  @redis.llen @queue_name
end

#number_errorsInteger

The number of items that have encountered fatal errors on the queue during the last [stat_timeout] minutes.

Returns:

  • (Integer)


133
134
135
# File 'lib/timberline/queue.rb', line 133

def number_errors
  Timberline.redis.xcard attr("error_stats")
end

#number_retriesInteger

The number of items that have been retried on the queue during the last [stat_timeout] minutes.

Returns:

  • (Integer)


142
143
144
# File 'lib/timberline/queue.rb', line 142

def number_retries
  Timberline.redis.xcard attr("retry_stats")
end

#number_successesInteger

The number of items that were processed successfully for this queue during the last [stat_timeout] minutes.

Returns:

  • (Integer)


151
152
153
# File 'lib/timberline/queue.rb', line 151

def number_successes
  Timberline.redis.xcard attr("success_stats")
end

#pauseObject

Puts this queue into paused mode.

See Also:



101
102
103
# File 'lib/timberline/queue.rb', line 101

def pause
  @redis.set(attr("paused"), "true")
end

#paused?boolean

Indicates whether or not this queue is currently in paused mode.

Returns:

  • (boolean)


115
116
117
# File 'lib/timberline/queue.rb', line 115

def paused?
  @redis.get(attr("paused")) == "true"
end

#popTimberline::Envelope

Uses a blocking read from redis to pull the next item off the queue. If the queue is paused, this method will block until the queue is unpaused, at which point it will move on to the blocking read.

Returns:

  • (Timberline::Envelope)

    the Envelope representation of the item that was pulled off the queue, or nil if the read timed out.



69
70
71
72
73
74
75
76
77
78
79
# File 'lib/timberline/queue.rb', line 69

def pop
  block_while_paused

  br_tuple = @redis.brpop(@queue_name, read_timeout)
  envelope_string = br_tuple.nil? ? nil : br_tuple[1]
  if envelope_string.nil?
    nil
  else
    Envelope.from_json(envelope_string)
  end
end

#push(contents, metadata = {}) ⇒ Object

Pushes the specified data onto the queue.

contents.

Parameters:

  • contents (#to_json, Timberline::Envelope)

    either contents that can be converted to JSON and stuffed in an Envelope, or an Envelope itself that needs to be put on the queue.

  • metadata (Hash) (defaults to: {})

    metadata that will be attached to the envelope for



89
90
91
92
93
94
95
96
# File 'lib/timberline/queue.rb', line 89

def push(contents,  = {})
  case contents
  when Envelope
    @redis.lpush @queue_name, contents
  else
    @redis.lpush @queue_name, wrap(contents, )
  end
end

#retry_item(item) ⇒ Object

Given an item that needs to be retried, increment the retry count, add any appropriate metadata about the retry, and push it back onto the queue. If the item has already been retried the maximum number of times, pass it on to error_item instead.

Parameters:

  • item (Envelope)

    an item that needs to be retried

See Also:



190
191
192
193
194
195
196
197
198
199
# File 'lib/timberline/queue.rb', line 190

def retry_item(item)
  if (item.retries < Timberline.max_retries)
    item.retries += 1
    item.last_tried_at = Time.now.to_f
    add_retry_stat(item)
    push(item)
  else
    error_item(item)
  end
end

#unpauseObject

Takes this queue back out of paused mode.

See Also:



108
109
110
# File 'lib/timberline/queue.rb', line 108

def unpause
  @redis.set(attr("paused"), "false")
end