Class: QueueingRabbit::Worker
- Inherits:
-
Object
- Object
- QueueingRabbit::Worker
- Includes:
- Logging
- Defined in:
- lib/queueing_rabbit/worker.rb
Defined Under Namespace
Classes: WorkerError
Instance Attribute Summary collapse
-
#concurrency ⇒ Object
readonly
Returns the value of attribute concurrency.
-
#jobs ⇒ Object
readonly
Returns the value of attribute jobs.
-
#mutex_pool ⇒ Object
readonly
Returns the value of attribute mutex_pool.
Instance Method Summary collapse
-
#initialize(jobs, concurrency = nil) ⇒ Worker
constructor
A new instance of Worker.
- #invoke_job(job, payload, metadata) ⇒ Object
- #pid ⇒ Object
- #pidfile_exists? ⇒ Boolean
- #read_pidfile ⇒ Object
- #remove_pidfile ⇒ Object
- #stop(connection = QueueingRabbit.connection, graceful = false) ⇒ Object
- #to_s ⇒ Object
- #use_pidfile(filename) ⇒ Object
- #work ⇒ Object
- #work! ⇒ Object
- #working? ⇒ Boolean
Constructor Details
#initialize(jobs, concurrency = nil) ⇒ Worker
Returns a new instance of Worker.
12 13 14 15 16 17 18 19 20 |
# File 'lib/queueing_rabbit/worker.rb', line 12 def initialize(jobs, concurrency = nil) @jobs = jobs.map { |job| job.to_s.strip }.reject { |job| job.empty? } @concurrency = concurrency || @jobs.count @mutex_pool = ::MutexPool.new(@concurrency) sync_stdio validate_jobs constantize_jobs end |
Instance Attribute Details
#concurrency ⇒ Object (readonly)
Returns the value of attribute concurrency.
10 11 12 |
# File 'lib/queueing_rabbit/worker.rb', line 10 def concurrency @concurrency end |
#jobs ⇒ Object (readonly)
Returns the value of attribute jobs.
10 11 12 |
# File 'lib/queueing_rabbit/worker.rb', line 10 def jobs @jobs end |
#mutex_pool ⇒ Object (readonly)
Returns the value of attribute mutex_pool.
10 11 12 |
# File 'lib/queueing_rabbit/worker.rb', line 10 def mutex_pool @mutex_pool end |
Instance Method Details
#invoke_job(job, payload, metadata) ⇒ Object
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/queueing_rabbit/worker.rb', line 92 def invoke_job(job, payload, ) info "performing job #{job}" if job.respond_to?(:perform) job.perform(payload, ) elsif job <= QueueingRabbit::AbstractJob job.new(payload, ).perform else error "don't know how to perform job #{job}" end rescue => e QueueingRabbit.trigger_event(:consumer_error, e) error "unexpected error #{e.class} occured: #{e.}" debug e end |
#pid ⇒ Object
61 62 63 |
# File 'lib/queueing_rabbit/worker.rb', line 61 def pid Process.pid end |
#pidfile_exists? ⇒ Boolean
57 58 59 |
# File 'lib/queueing_rabbit/worker.rb', line 57 def pidfile_exists? @pidfile && File.exists?(@pidfile) end |
#read_pidfile ⇒ Object
53 54 55 |
# File 'lib/queueing_rabbit/worker.rb', line 53 def read_pidfile File.read(@pidfile).to_i if pidfile_exists? end |
#remove_pidfile ⇒ Object
49 50 51 |
# File 'lib/queueing_rabbit/worker.rb', line 49 def remove_pidfile File.delete(@pidfile) if pidfile_exists? end |
#stop(connection = QueueingRabbit.connection, graceful = false) ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/queueing_rabbit/worker.rb', line 69 def stop(connection = QueueingRabbit.connection, graceful = false) connection.next_tick do begin @working = false if graceful Timeout.timeout(QueueingRabbit.jobs_wait_timeout) { @mutex_pool.lock } QueueingRabbit.trigger_event(:consuming_done) info "gracefully shutting down the worker #{self}" end rescue Timeout::Error error "a timeout (> #{QueueingRabbit.jobs_wait_timeout}s) when trying to gracefully shut down the worker " \ "#{self}" rescue => e error "a #{e.class} error occurred when trying to shut down the worker #{self}" debug e ensure connection.close do remove_pidfile end end end end |
#to_s ⇒ Object
65 66 67 |
# File 'lib/queueing_rabbit/worker.rb', line 65 def to_s "PID=#{pid}, JOBS=#{jobs.join(',')} CONCURRENCY=#{@concurrency}" end |
#use_pidfile(filename) ⇒ Object
43 44 45 46 47 |
# File 'lib/queueing_rabbit/worker.rb', line 43 def use_pidfile(filename) @pidfile = filename cleanup_pidfile File.open(@pidfile, 'w') { |f| f << pid } end |
#work ⇒ Object
26 27 28 29 30 31 32 33 |
# File 'lib/queueing_rabbit/worker.rb', line 26 def work return if working? @working = true QueueingRabbit.trigger_event(:worker_ready) jobs.each { |job| run_job(QueueingRabbit.connection, job) } QueueingRabbit.trigger_event(:consuming_started) end |
#work! ⇒ Object
35 36 37 38 39 40 41 |
# File 'lib/queueing_rabbit/worker.rb', line 35 def work! return if working? trap_signals info "starting a new queueing_rabbit worker #{self}" QueueingRabbit.begin_worker_loop { work } end |
#working? ⇒ Boolean
22 23 24 |
# File 'lib/queueing_rabbit/worker.rb', line 22 def working? @working end |