Class: RabbitJobs::Worker
- Inherits:
-
Object
- Object
- RabbitJobs::Worker
- Defined in:
- lib/rabbit_jobs/worker.rb
Instance Attribute Summary collapse
-
#_channel ⇒ Object
Returns the value of attribute _channel.
-
#_connection ⇒ Object
Returns the value of attribute _connection.
-
#background ⇒ Object
Returns the value of attribute background.
-
#pidfile ⇒ Object
Returns the value of attribute pidfile.
-
#process_name ⇒ Object
Returns the value of attribute process_name.
-
#worker_pid ⇒ Object
Returns the value of attribute worker_pid.
Instance Method Summary collapse
- #amqp_channel ⇒ Object
- #amqp_connection ⇒ Object
- #cleanup ⇒ Object
-
#initialize(*queues) ⇒ Worker
constructor
Workers should be initialized with an array of string queue names.
- #process_message(metadata, payload) ⇒ Object
- #queue_name(routing_key) ⇒ Object
- #queue_params(routing_key) ⇒ Object
- #queues ⇒ Object
- #shutdown ⇒ Object
- #shutdown! ⇒ Object
- #startup ⇒ Object
-
#work(time = 0) ⇒ Object
Subscribes to queue and working on jobs.
Constructor Details
#initialize(*queues) ⇒ Worker
Workers should be initialized with an array of string queue names. The order is important: a Worker will check the first queue given for a job. If none is found, it will check the second queue name given. If a job is found, it will be processed. Upon completion, the Worker will again check the first queue given, and so forth. In this way the queue list passed to a Worker on startup defines the priorities of queues.
If passed a single “*”, this Worker will operate on all queues in alphabetical order. Queues can be dynamically added or removed without needing to restart workers using this method.
61 62 63 64 65 66 67 |
# File 'lib/rabbit_jobs/worker.rb', line 61 def initialize(*queues) RJ.config.init_default_queue @queues = queues.map { |queue| queue.to_s.strip }.flatten.uniq if @queues == ['*'] || @queues.empty? @queues = RabbitJobs.config.routing_keys end end |
Instance Attribute Details
#_channel ⇒ Object
Returns the value of attribute _channel.
7 8 9 |
# File 'lib/rabbit_jobs/worker.rb', line 7 def _channel @_channel end |
#_connection ⇒ Object
Returns the value of attribute _connection.
7 8 9 |
# File 'lib/rabbit_jobs/worker.rb', line 7 def _connection @_connection end |
#background ⇒ Object
Returns the value of attribute background.
5 6 7 |
# File 'lib/rabbit_jobs/worker.rb', line 5 def background @background end |
#pidfile ⇒ Object
Returns the value of attribute pidfile.
5 6 7 |
# File 'lib/rabbit_jobs/worker.rb', line 5 def pidfile @pidfile end |
#process_name ⇒ Object
Returns the value of attribute process_name.
5 6 7 |
# File 'lib/rabbit_jobs/worker.rb', line 5 def process_name @process_name end |
#worker_pid ⇒ Object
Returns the value of attribute worker_pid.
5 6 7 |
# File 'lib/rabbit_jobs/worker.rb', line 5 def worker_pid @worker_pid end |
Instance Method Details
#amqp_channel ⇒ Object
13 14 15 |
# File 'lib/rabbit_jobs/worker.rb', line 13 def amqp_channel self._channel ||= AmqpHelper.create_channel(self._connection) end |
#amqp_connection ⇒ Object
9 10 11 |
# File 'lib/rabbit_jobs/worker.rb', line 9 def amqp_connection self._connection ||= AmqpHelper.prepare_connection(self._connection) end |
#cleanup ⇒ Object
17 18 19 |
# File 'lib/rabbit_jobs/worker.rb', line 17 def cleanup self._connection = self._channel = nil end |
#process_message(metadata, payload) ⇒ Object
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/rabbit_jobs/worker.rb', line 21 def (, payload) job = RJ::Job.parse(payload) if job.is_a?(Symbol) # case @job # when :not_found # when :parsing_error # when :error # end else if job.expired? RJ.logger.warn "Job expired: #{job.to_ruby_string}" false else job.run_perform end end true end |
#queue_name(routing_key) ⇒ Object
42 43 44 |
# File 'lib/rabbit_jobs/worker.rb', line 42 def queue_name(routing_key) RJ.config.queue_name(routing_key) end |
#queue_params(routing_key) ⇒ Object
46 47 48 |
# File 'lib/rabbit_jobs/worker.rb', line 46 def queue_params(routing_key) RJ.config[:queues][routing_key] end |
#queues ⇒ Object
69 70 71 |
# File 'lib/rabbit_jobs/worker.rb', line 69 def queues @queues || [:default] end |
#shutdown ⇒ Object
150 151 152 |
# File 'lib/rabbit_jobs/worker.rb', line 150 def shutdown @shutdown = true end |
#shutdown! ⇒ Object
183 184 185 |
# File 'lib/rabbit_jobs/worker.rb', line 183 def shutdown! shutdown end |
#startup ⇒ Object
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/rabbit_jobs/worker.rb', line 154 def startup # prune_dead_workers RabbitJobs::Util.check_pidfile(self.pidfile) if self.pidfile if self.background return false if self.worker_pid = fork # daemonize child process Process.daemon(true) end count = RJ._run_after_fork_callbacks self.worker_pid ||= Process.pid if self.pidfile File.open(self.pidfile, 'w') { |f| f << Process.pid } end $stdout.sync = true @shutdown = false Signal.trap('TERM') { shutdown } Signal.trap('INT') { shutdown! } true end |
#work(time = 0) ⇒ Object
Subscribes to queue and working on jobs
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/rabbit_jobs/worker.rb', line 74 def work(time = 0) return false unless startup $0 = self.process_name || "rj_worker (#{queues.join(', ')})" processed_count = 0 begin RJ.run do check_shutdown = Proc.new { if @shutdown RJ.stop RJ.logger.info "Processed jobs: #{processed_count}." RJ.logger.info "Stopped." File.delete(self.pidfile) if self.pidfile && File.exists?(self.pidfile) end } amqp_connection amqp_channel.prefetch(1) queues.each do |routing_key| routing_key = routing_key.to_sym amqp_channel.queue(queue_name(routing_key), queue_params(routing_key)) { |queue, declare_ok| explicit_ack = !!queue_params(routing_key)[:ack] RJ.logger.info "Subscribing to #{queue_name(routing_key)}" queue.subscribe(ack: explicit_ack) do |, payload| if RJ. begin processed_count += 1 if (, payload) rescue RJ.logger.warn "process_message failed. payload: #{payload.inspect}" RJ.logger.warn $!.inspect $!.backtrace.each {|l| RJ.logger.warn l} end .ack if explicit_ack else RJ.logger.warn "before_process_message hook failed, requeuing payload: #{payload.inspect}" .reject(requeue: true) if explicit_ack end check_shutdown.call end } end if time > 0 # for debugging EM.add_timer(time) do self.shutdown end end EM.add_periodic_timer(1) do check_shutdown.call end RJ.logger.info "Started." end rescue error = $! if RJ.logger begin RJ.logger.error [error., error.backtrace].flatten.join("\n") ensure abort(error.) end end end true end |