Class: Resque::Plugins::Fifo::Worker
- Inherits:
-
Worker
- Object
- Worker
- Resque::Plugins::Fifo::Worker
- Defined in:
- lib/resque/plugins/fifo/worker.rb
Constant Summary collapse
- UPDATE_DELAY =
10
Instance Attribute Summary collapse
-
#main_queue_name ⇒ Object
Returns the value of attribute main_queue_name.
Instance Method Summary collapse
- #queues=(queues) ⇒ Object
-
#register_worker ⇒ Object
Registers ourself as a worker.
-
#reserve ⇒ Object
Attempts to grab a job off one of the provided queues.
- #unregister_worker(exception = nil) ⇒ Object
Instance Attribute Details
#main_queue_name ⇒ Object
Returns the value of attribute main_queue_name.
8 9 10 |
# File 'lib/resque/plugins/fifo/worker.rb', line 8 def main_queue_name @main_queue_name end |
Instance Method Details
#queues=(queues) ⇒ Object
10 11 12 13 14 15 16 17 18 19 |
# File 'lib/resque/plugins/fifo/worker.rb', line 10 def queues=(queues) queues = queues.empty? ? (ENV["QUEUES"] || ENV['QUEUE']).to_s.split(',') : queues @main_queue_name = "#{manager.queue_prefix}-#{SecureRandom.hex(10)}" @queues = ([:fifo_refresh, main_queue_name] + queues).map { |queue| queue.to_s.strip } unless ['*', '?', '{', '}', '[', ']'].any? { |char| @queues.join.include?(char) } @static_queues = @queues.flatten.uniq end validate_queues end |
#register_worker ⇒ Object
Registers ourself as a worker. Useful when entering the worker lifecycle on startup.
51 52 53 54 55 56 |
# File 'lib/resque/plugins/fifo/worker.rb', line 51 def register_worker super puts "Fifo Startup - Updating worker list" manager.request_refresh end |
#reserve ⇒ Object
Attempts to grab a job off one of the provided queues. Returns nil if no job can be found.
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/resque/plugins/fifo/worker.rb', line 23 def reserve queues.each do |queue| log_with_severity :debug, "Checking #{queue}" if job = Resque.reserve(queue) log_with_severity :debug, "Found job on #{queue}" if job.payload['enqueue_ts'] delay_ts = Time.now.to_i - job.payload['enqueue_ts'].to_i max_delay = Resque.redis.get("fifo-stats-max-delay") || 0 Resque.redis.incrby("fifo-stats-accumulated-delay", delay_ts) Resque.redis.incr("fifo-stats-accumulated-count") if (delay_ts > max_delay.to_i) Resque.redis.set("fifo-stats-max-delay", max_delay) end end return job end end nil rescue Exception => e log_with_severity :error, "Error reserving job: #{e.inspect}" log_with_severity :error, e.backtrace.join("\n") raise e end |
#unregister_worker(exception = nil) ⇒ Object
58 59 60 61 62 63 |
# File 'lib/resque/plugins/fifo/worker.rb', line 58 def unregister_worker(exception = nil) super(exception) puts "Fifo Shutdown - Updating worker list" manager.request_refresh end |