Class: JobDispatch::Worker::Socket
- Inherits:
-
Object
- Object
- JobDispatch::Worker::Socket
- Defined in:
- lib/job_dispatch/worker/socket.rb
Instance Attribute Summary collapse
-
#item_class ⇒ Object
readonly
Returns the value of attribute item_class.
-
#socket ⇒ Object
readonly
Returns the value of attribute socket.
Instance Method Summary collapse
- #ask_for_work(queue) ⇒ Object
- #close ⇒ Object
- #identity ⇒ Object
-
#initialize(connect_address, item_klass) ⇒ Socket
constructor
A new instance of Socket.
- #poll_item ⇒ Object
-
#read_item ⇒ JobDispatch::Item
read an incoming message.
- #send_goodbye(queue) ⇒ Object
-
#send_response(job_id, status, result) ⇒ Object
after execution, send the response.
- #send_touch(job_id, timeout = nil) ⇒ Object
Constructor Details
#initialize(connect_address, item_klass) ⇒ Socket
Returns a new instance of Socket.
13 14 15 16 17 |
# File 'lib/job_dispatch/worker/socket.rb', line 13 def initialize(connect_address, item_klass) @socket = JobDispatch.context.socket(ZMQ::REQ) @socket.connect(connect_address) @item_class = item_klass end |
Instance Attribute Details
#item_class ⇒ Object (readonly)
Returns the value of attribute item_class.
11 12 13 |
# File 'lib/job_dispatch/worker/socket.rb', line 11 def item_class @item_class end |
#socket ⇒ Object (readonly)
Returns the value of attribute socket.
10 11 12 |
# File 'lib/job_dispatch/worker/socket.rb', line 10 def socket @socket end |
Instance Method Details
#ask_for_work(queue) ⇒ Object
23 24 25 |
# File 'lib/job_dispatch/worker/socket.rb', line 23 def ask_for_work(queue) @socket.send(JSON.dump({command: 'ready', queue: queue, worker_name: identity})) end |
#close ⇒ Object
31 32 33 |
# File 'lib/job_dispatch/worker/socket.rb', line 31 def close @socket.close end |
#identity ⇒ Object
35 36 37 38 39 40 41 42 |
# File 'lib/job_dispatch/worker/socket.rb', line 35 def identity @identity ||= begin hostname = ::Socket.gethostname process = Process.pid thread = Thread.current.object_id.to_s(16) ['ruby', hostname, process, thread].join(':') end end |
#poll_item ⇒ Object
19 20 21 |
# File 'lib/job_dispatch/worker/socket.rb', line 19 def poll_item @poll_item ||= ZMQ::Pollitem(@socket, ZMQ::POLLIN) end |
#read_item ⇒ JobDispatch::Item
read an incoming message. The thread will block if there is no readable message.
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/job_dispatch/worker/socket.rb', line 47 def read_item json = @socket.recv begin params = JSON.parse(json) case params["command"] when "job" item = item_class.new params["target"], params["method"], *params["parameters"] when "idle" item = item_class.new "JobDispatch", "idle" when "quit" puts "It's quittin' time!" Process.exit(0) else item = item_class.new "JobDispatch", "unknown_command", params end item.job_id = params["job_id"] rescue StandardError => e JobDispatch.logger.error "Failed to read message from worker socket: #{e}" nil end item end |
#send_goodbye(queue) ⇒ Object
27 28 29 |
# File 'lib/job_dispatch/worker/socket.rb', line 27 def send_goodbye(queue) @socket.send(JSON.dump({command: 'goodbye', worker_name: identity})) end |
#send_response(job_id, status, result) ⇒ Object
after execution, send the response.
71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/job_dispatch/worker/socket.rb', line 71 def send_response(job_id, status, result) JobDispatch.logger.info "Worker #{Process.pid} completed job_id: #{job_id}: #{status}, result: #{result}" response = { command: 'completed', ready: true, job_id: job_id, result: result, status: status } @socket.send(JSON.dump(response)) end |
#send_touch(job_id, timeout = nil) ⇒ Object
83 84 85 86 87 88 89 90 91 92 |
# File 'lib/job_dispatch/worker/socket.rb', line 83 def send_touch(job_id, timeout=nil) hash = { command: 'touch', job_id: job_id } hash[:timeout] = timeout if timeout @socket.send(JSON.dump(hash)) json = @socket.recv # wait for acknowledgement... this could be done via pub/sub to be asynchronous. JSON.parse(json) rescue {:error => "Failed to decode JSON from dispatcher: #{json}"} end |