Class: Majordomo::Worker
- Inherits:
-
Object
- Object
- Majordomo::Worker
- Defined in:
- lib/majordomo/worker.rb
Constant Summary collapse
- HEARTBEAT_LIVENESS =
3-5 is reasonable
3
Instance Method Summary collapse
-
#initialize(broker, service, verbose = false) ⇒ Worker
constructor
A new instance of Worker.
- #receive_and_send(reply = nil) ⇒ Object
Constructor Details
#initialize(broker, service, verbose = false) ⇒ Worker
Returns a new instance of Worker.
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/majordomo/worker.rb', line 15 def initialize(broker, service, verbose=false) @heartbeat_at = 0 # When to send HEARTBEAT @liveness = 0 # How many attempts left @heartbeat = 2500 # Heartbeat delay, msecs @reconnect = 2500 # Reconnect delay, msecs # Internal state @expect_reply = false # false only at start @timeout = 2500 # poller timeout # Return address, if any @reply_to = nil @broker = broker @service = service @verbose = verbose @ctx = ZMQ::Context.new reconnect_to_broker end |
Instance Method Details
#receive_and_send(reply = nil) ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/majordomo/worker.rb', line 37 def receive_and_send(reply=nil) # Format and send the reply if we were provided one raise "bad reply" unless (reply or not @expect_reply) if reply raise "not expecting reply" unless @reply_to log reply.inspect log reply.class reply = [@reply_to, ''] + reply send_to_broker Majordomo::W_REPLY, nil, reply @expect_reply = true end while true # Poll socket for a reply, with timeout begin items = @poller.poll(@timeout) rescue => e break # Interrupted end case items when 0 # nothing waiting, but not an error log "timed out on poll, reducing liveness" reduce_liveness when -1 msg = "error polling socket: errno is #{ZMQ::Util.error_string}" log msg raise msg else if (result = ) log "result from handle_message: |#{result}|" return result end end # Send HEARTBEAT if it's time if Time.now > @heartbeat_at send_to_broker(Majordomo::W_HEARTBEAT) @heartbeat_at = Time.now + 1e-3*@heartbeat end end log "W: interrupt received, killing worker…" return nil end |