Class: Timberline::Queue
- Inherits:
-
Object
- Object
- Timberline::Queue
- Defined in:
- lib/timberline/queue.rb
Overview
Queue is the heart and soul of Timberline, which makes sense considering that it’s a queueing library. This object represents a queue in redis (really just a list of strings) and is responsible for reading to the queue, writing from the queue, maintaining queue statistics, and managing other queue actions (like pausing and deleting).
Instance Attribute Summary collapse
-
#queue_name ⇒ String
readonly
the name of this queue.
-
#read_timeout ⇒ Integer
readonly
how long this queue should wait, in seconds, before determining that there isn’t anything to read off of the queue.
Instance Method Summary collapse
-
#add_error_stat(item) ⇒ Object
Stores an item from the queue that fatally errored so we can keep track of things like how many errors have occurred on this queue, etc.
-
#add_retry_stat(item) ⇒ Object
Stores an item from the queue that was retried so we can keep track of things like how many retries have been attempted on this queue, etc.
-
#add_success_stat(item) ⇒ Object
Stores a successfully processed queue item as a statistic so we can keep track of things like average execution time, number of successes, etc.
-
#attr(key) ⇒ String
Given a key, create a string namespaced to this queue name.
-
#average_execution_time ⇒ Float
Given all of the successful jobs that were executed in the last [stat_timeout] minutes, determine how long on average those jobs took to execute.
-
#delete ⇒ Object
Delete this queue, removing it from redis and all other references to it from Timberline.
-
#error_item(item) ⇒ Object
Given an item that errored out in processing, add any appropriate metadata about the error, track it as a statistic, and push it onto the error queue.
-
#error_queue ⇒ Timberline::Queue
A (hidden) Queue object where this queue’s errors are pushed.
-
#initialize(queue_name, opts = {}) ⇒ Queue
constructor
Build a new Queue object.
-
#length ⇒ Integer
The current number of items on the queue waiting to be processed.
-
#number_errors ⇒ Integer
The number of items that have encountered fatal errors on the queue during the last [stat_timeout] minutes.
-
#number_retries ⇒ Integer
The number of items that have been retried on the queue during the last [stat_timeout] minutes.
-
#number_successes ⇒ Integer
The number of items that were processed successfully for this queue during the last [stat_timeout] minutes.
-
#pause ⇒ Object
Puts this queue into paused mode.
-
#paused? ⇒ boolean
Indicates whether or not this queue is currently in paused mode.
-
#pop ⇒ Timberline::Envelope
Uses a blocking read from redis to pull the next item off the queue.
-
#push(contents, metadata = {}) ⇒ Object
Pushes the specified data onto the queue.
-
#retry_item(item) ⇒ Object
Given an item that needs to be retried, increment the retry count, add any appropriate metadata about the retry, and push it back onto the queue.
-
#unpause ⇒ Object
Takes this queue back out of paused mode.
Constructor Details
#initialize(queue_name, opts = {}) ⇒ Queue
Build a new Queue object.
27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/timberline/queue.rb', line 27 def initialize(queue_name, opts = {}) read_timeout = opts.fetch(:read_timeout, 0) hidden = opts.fetch(:hidden, false) if queue_name.nil? raise ArgumentError.new("Queue name must be provided.") end @queue_name = queue_name @read_timeout = read_timeout @redis = Timberline.redis unless hidden @redis.sadd "timberline_queue_names", queue_name end end |
Instance Attribute Details
#queue_name ⇒ String (readonly)
the name of this queue
13 14 15 |
# File 'lib/timberline/queue.rb', line 13 def queue_name @queue_name end |
#read_timeout ⇒ Integer (readonly)
how long this queue should wait, in seconds, before determining that there isn’t anything to read off of the queue.
13 14 15 |
# File 'lib/timberline/queue.rb', line 13 def read_timeout @read_timeout end |
Instance Method Details
#add_error_stat(item) ⇒ Object
Stores an item from the queue that fatally errored so we can keep track of things like how many errors have occurred on this queue, etc.
226 227 228 |
# File 'lib/timberline/queue.rb', line 226 def add_error_stat(item) add_stat_for_key(attr("error_stats"), item) end |
#add_retry_stat(item) ⇒ Object
Stores an item from the queue that was retried so we can keep track of things like how many retries have been attempted on this queue, etc.
217 218 219 |
# File 'lib/timberline/queue.rb', line 217 def add_retry_stat(item) add_stat_for_key(attr("retry_stats"), item) end |
#add_success_stat(item) ⇒ Object
Stores a successfully processed queue item as a statistic so we can keep track of things like average execution time, number of successes, etc.
236 237 238 239 240 |
# File 'lib/timberline/queue.rb', line 236 def add_success_stat(item) add_stat_for_key(attr("success_stats"), item) rescue Exception => e $stderr.puts "Success Stat Error: #{e.inspect}, Item: #{item.inspect}" end |
#attr(key) ⇒ String
Given a key, create a string namespaced to this queue name. This method is used to keep redis keys tidy.
124 125 126 |
# File 'lib/timberline/queue.rb', line 124 def attr(key) "#{@queue_name}:#{key}" end |
#average_execution_time ⇒ Float
Given all of the successful jobs that were executed in the last
- stat_timeout
-
minutes, determine how long on average those jobs
took to execute.
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
# File 'lib/timberline/queue.rb', line 162 def average_execution_time successes = Timberline.redis.xmembers(attr("success_stats")).map { |item| Envelope.from_json(item)} times = successes.map do |item| if item.finished_processing_at item.finished_processing_at.to_f - item.started_processing_at.to_f elsif item.fatal_error_at item.fatal_error_at.to_f - item.started_processing_at.to_f else nil end end times.reject! { |t| t.nil? } if times.size == 0 0 else times.inject(0, :+) / times.size.to_f end end |
#delete ⇒ Object
Delete this queue, removing it from redis and all other references to it from Timberline.
46 47 48 49 50 51 52 |
# File 'lib/timberline/queue.rb', line 46 def delete @redis.del @queue_name @redis.keys("#{@queue_name}:*").each do |key| @redis.del key end @redis.srem "timberline_queue_names", @queue_name end |
#error_item(item) ⇒ Object
Given an item that errored out in processing, add any appropriate metadata about the error, track it as a statistic, and push it onto the error queue.
206 207 208 209 210 |
# File 'lib/timberline/queue.rb', line 206 def error_item(item) item.fatal_error_at = Time.now.to_f add_error_stat(item) self.error_queue.push(item) end |
#error_queue ⇒ Timberline::Queue
Returns a (hidden) Queue object where this queue’s errors are pushed.
245 246 247 |
# File 'lib/timberline/queue.rb', line 245 def error_queue @error_queue ||= Timberline.queue(attr("errors"), hidden: true) end |
#length ⇒ Integer
The current number of items on the queue waiting to be processed.
58 59 60 |
# File 'lib/timberline/queue.rb', line 58 def length @redis.llen @queue_name end |
#number_errors ⇒ Integer
The number of items that have encountered fatal errors on the queue during the last [stat_timeout] minutes.
133 134 135 |
# File 'lib/timberline/queue.rb', line 133 def number_errors Timberline.redis.xcard attr("error_stats") end |
#number_retries ⇒ Integer
The number of items that have been retried on the queue during the last [stat_timeout] minutes.
142 143 144 |
# File 'lib/timberline/queue.rb', line 142 def number_retries Timberline.redis.xcard attr("retry_stats") end |
#number_successes ⇒ Integer
The number of items that were processed successfully for this queue during the last [stat_timeout] minutes.
151 152 153 |
# File 'lib/timberline/queue.rb', line 151 def number_successes Timberline.redis.xcard attr("success_stats") end |
#pause ⇒ Object
Puts this queue into paused mode.
101 102 103 |
# File 'lib/timberline/queue.rb', line 101 def pause @redis.set(attr("paused"), "true") end |
#paused? ⇒ boolean
Indicates whether or not this queue is currently in paused mode.
115 116 117 |
# File 'lib/timberline/queue.rb', line 115 def paused? @redis.get(attr("paused")) == "true" end |
#pop ⇒ Timberline::Envelope
Uses a blocking read from redis to pull the next item off the queue. If the queue is paused, this method will block until the queue is unpaused, at which point it will move on to the blocking read.
69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/timberline/queue.rb', line 69 def pop block_while_paused br_tuple = @redis.brpop(@queue_name, read_timeout) envelope_string = br_tuple.nil? ? nil : br_tuple[1] if envelope_string.nil? nil else Envelope.from_json(envelope_string) end end |
#push(contents, metadata = {}) ⇒ Object
Pushes the specified data onto the queue.
contents.
89 90 91 92 93 94 95 96 |
# File 'lib/timberline/queue.rb', line 89 def push(contents, = {}) case contents when Envelope @redis.lpush @queue_name, contents else @redis.lpush @queue_name, wrap(contents, ) end end |
#retry_item(item) ⇒ Object
Given an item that needs to be retried, increment the retry count, add any appropriate metadata about the retry, and push it back onto the queue. If the item has already been retried the maximum number of times, pass it on to error_item instead.
190 191 192 193 194 195 196 197 198 199 |
# File 'lib/timberline/queue.rb', line 190 def retry_item(item) if (item.retries < Timberline.max_retries) item.retries += 1 item.last_tried_at = Time.now.to_f add_retry_stat(item) push(item) else error_item(item) end end |
#unpause ⇒ Object
Takes this queue back out of paused mode.
108 109 110 |
# File 'lib/timberline/queue.rb', line 108 def unpause @redis.set(attr("paused"), "false") end |