Class: Majordomo::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/majordomo/worker.rb

Constant Summary collapse

HEARTBEAT_LIVENESS =

3-5 is reasonable

3

Instance Method Summary collapse

Constructor Details

#initialize(broker, service, verbose = false) ⇒ Worker

Returns a new instance of Worker.



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/majordomo/worker.rb', line 15

def initialize(broker, service, verbose=false)
  @heartbeat_at = 0 # When to send HEARTBEAT 
  @liveness = 0 # How many attempts left
  @heartbeat = 2500 # Heartbeat delay, msecs
  @reconnect = 2500 # Reconnect delay, msecs

  # Internal state
  @expect_reply = false # false only at start

  @timeout = 2500 # poller timeout

  # Return address, if any
  @reply_to = nil
  
  @broker = broker
  @service = service
  @verbose = verbose
  @ctx = ZMQ::Context.new

  reconnect_to_broker
end

Instance Method Details

#receive_and_send(reply = nil) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/majordomo/worker.rb', line 37

def receive_and_send(reply=nil)
  # Format and send the reply if we were provided one
  raise "bad reply" unless (reply or not @expect_reply)

  if reply
    raise "not expecting reply" unless @reply_to
    log reply.inspect
    log reply.class
    reply = [@reply_to, ''] + reply
    send_to_broker Majordomo::W_REPLY, nil, reply

    @expect_reply = true
  end
  while true
    # Poll socket for a reply, with timeout
    begin
      items = @poller.poll(@timeout)
    rescue => e
      break # Interrupted
    end
    case items
    when 0
      # nothing waiting, but not an error
      log "timed out on poll, reducing liveness"
      reduce_liveness
    when -1
      msg = "error polling socket: errno is #{ZMQ::Util.error_string}"
      log msg
      raise msg
    else
      if (result = handle_message)
        log "result from handle_message: |#{result}|"
        return result
      end
    end
    # Send HEARTBEAT if it's time
    if Time.now > @heartbeat_at
      send_to_broker(Majordomo::W_HEARTBEAT)
      @heartbeat_at = Time.now + 1e-3*@heartbeat
    end
    
  end
  log "W: interrupt received, killing worker…"
  return nil
end