Class: Rails::Queue::ThreadedQueueConsumer

Inherits:
Object
  • Object
show all
Defined in:
lib/rails/queue/queue.rb

Overview

The threaded consumer will run jobs in a background thread in development mode or in a VM where running jobs on a thread in production mode makes sense.

When the process exits, the consumer pushes a nil onto the queue and joins the thread, which will ensure that all jobs are executed before the process finally dies.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue, options = {}) ⇒ ThreadedQueueConsumer

Returns a new instance of ThreadedQueueConsumer.



72
73
74
75
76
# File 'lib/rails/queue/queue.rb', line 72

def initialize(queue, options = {})
  @queue = queue
  @logger = options[:logger]
  @fallback_logger = Logger.new($stderr)
end

Instance Attribute Details

#loggerObject

Returns the value of attribute logger.



70
71
72
# File 'lib/rails/queue/queue.rb', line 70

def logger
  @logger
end

Instance Method Details

#consumeObject



92
93
94
95
96
# File 'lib/rails/queue/queue.rb', line 92

def consume
  while job = @queue.pop
    run job
  end
end

#drainObject



88
89
90
# File 'lib/rails/queue/queue.rb', line 88

def drain
  @queue.pop.run until @queue.empty?
end

#handle_exception(job, exception) ⇒ Object



104
105
106
# File 'lib/rails/queue/queue.rb', line 104

def handle_exception(job, exception)
  (logger || @fallback_logger).error "Job Error: #{job.inspect}\n#{exception.message}\n#{exception.backtrace.join("\n")}"
end

#run(job) ⇒ Object



98
99
100
101
102
# File 'lib/rails/queue/queue.rb', line 98

def run(job)
  job.run
rescue Exception => exception
  handle_exception job, exception
end

#shutdownObject



83
84
85
86
# File 'lib/rails/queue/queue.rb', line 83

def shutdown
  @queue.push nil
  @thread.join
end

#startObject



78
79
80
81
# File 'lib/rails/queue/queue.rb', line 78

def start
  @thread = Thread.new { consume }
  self
end