Class: JobDispatch::Worker
- Inherits:
-
Object
- Object
- JobDispatch::Worker
- Defined in:
- lib/job_dispatch/worker.rb,
lib/job_dispatch/worker/item.rb,
lib/job_dispatch/worker/socket.rb
Overview
This class is the main worker loop. Run it as a whole process or just as a thread in a multi-threaded worker process.
Defined Under Namespace
Constant Summary collapse
- IDLE_TIME =
3- IDLE_COUNT =
10
Instance Attribute Summary collapse
-
#item_class ⇒ Object
readonly
Returns the value of attribute item_class.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#socket ⇒ Object
readonly
Returns the value of attribute socket.
Class Method Summary collapse
Instance Method Summary collapse
- #ask_for_work ⇒ Object
- #connect ⇒ Object
- #disconnect ⇒ Object
-
#initialize(connect_address, options = {}) ⇒ Worker
constructor
A new instance of Worker.
- #run ⇒ Object
- #running? ⇒ Boolean
- #send_goodbye ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(connect_address, options = {}) ⇒ Worker
Returns a new instance of Worker.
18 19 20 21 22 23 24 |
# File 'lib/job_dispatch/worker.rb', line 18 def initialize(connect_address, ={}) ||= {} @connect_address = connect_address @queue = [:queue] || 'default' @running = false @item_class = [:item_class] || Worker::Item end |
Instance Attribute Details
#item_class ⇒ Object (readonly)
Returns the value of attribute item_class.
16 17 18 |
# File 'lib/job_dispatch/worker.rb', line 16 def item_class @item_class end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
15 16 17 |
# File 'lib/job_dispatch/worker.rb', line 15 def queue @queue end |
#socket ⇒ Object (readonly)
Returns the value of attribute socket.
14 15 16 |
# File 'lib/job_dispatch/worker.rb', line 14 def socket @socket end |
Class Method Details
.touch(timeout = nil) ⇒ Object
92 93 94 95 96 97 98 |
# File 'lib/job_dispatch/worker.rb', line 92 def self.touch(timeout=nil) sock = Thread.current["JobDispatch::Worker.socket"] job_id = Thread.current["JobDispatch::Worker.job_id"] if sock && job_id sock.send_touch(job_id, timeout) end end |
Instance Method Details
#ask_for_work ⇒ Object
76 77 78 |
# File 'lib/job_dispatch/worker.rb', line 76 def ask_for_work socket.ask_for_work(queue) end |
#connect ⇒ Object
26 27 28 29 |
# File 'lib/job_dispatch/worker.rb', line 26 def connect @socket ||= Worker::Socket.new(@connect_address, item_class) Thread.current["JobDispatch::Worker.socket"] = @socket end |
#disconnect ⇒ Object
31 32 33 34 35 36 37 |
# File 'lib/job_dispatch/worker.rb', line 31 def disconnect if @socket @socket.close @socket = nil Thread.current["JobDispatch::Worker.socket"] = nil end end |
#run ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/job_dispatch/worker.rb', line 39 def run @running = true while running? puts "connecting" connect puts "asking for work" ask_for_work # if we are idle for too many times, the broker has restarted or gone away, and we will be stuck in receive # state, so we need to close the socket and make a new one to ask for work again. idle_count = 0 poller = ZMQ::Poller.new poller.register(socket.poll_item) while running? and idle_count < IDLE_COUNT begin poller.poll(IDLE_TIME) if poller.readables.include?(socket.socket) process idle_count = 0 else idle idle_count += 1 end rescue Interrupt puts "Worker stopping." JobDispatch.logger.info("Worker #{}") stop disconnect connect send_goodbye end end disconnect end end |
#running? ⇒ Boolean
84 85 86 |
# File 'lib/job_dispatch/worker.rb', line 84 def running? @running end |
#send_goodbye ⇒ Object
80 81 82 |
# File 'lib/job_dispatch/worker.rb', line 80 def send_goodbye socket.send_goodbye(queue) end |
#stop ⇒ Object
88 89 90 |
# File 'lib/job_dispatch/worker.rb', line 88 def stop @running = false end |