Class: Protobuf::Rpc::Zmq::Worker

Inherits:
Object
  • Object
show all
Includes:
Server, Util
Defined in:
lib/protobuf/rpc/servers/zmq/worker.rb

Instance Method Summary collapse

Methods included from Util

#log_signature, #resolve_ip, #zmq_error_check

Methods included from Logging

initialize_logger, #log_exception, #log_signature, #logger, #sign_message

Methods included from Server

#gc_pause, #handle_request, #log_signature

Constructor Details

#initialize(server, broker) ⇒ Worker

Constructor



15
16
17
18
19
20
21
22
23
24
# File 'lib/protobuf/rpc/servers/zmq/worker.rb', line 15

def initialize(server, broker)
  @server = server
  @broker = broker

  init_zmq_context
  init_backend_socket
rescue
  teardown
  raise
end

Instance Method Details

#process_requestObject

Instance Methods



29
30
31
32
33
34
35
36
37
# File 'lib/protobuf/rpc/servers/zmq/worker.rb', line 29

def process_request
  client_address, _, data = read_from_backend
  return unless data

  gc_pause do
    encoded_response = handle_request(data)
    write_to_backend([client_address, ::Protobuf::Rpc::Zmq::EMPTY_STRING, encoded_response])
  end
end

#runObject



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/protobuf/rpc/servers/zmq/worker.rb', line 39

def run
  poller = ::ZMQ::Poller.new
  poller.register_readable(@backend_socket)
  poller.register_readable(@shutdown_socket)

  # Send request to broker telling it we are ready
  write_to_backend([::Protobuf::Rpc::Zmq::WORKER_READY_MESSAGE])

  loop do
    rc = poller.poll(500)

    if rc == 0 && !running? # rubocop:disable Style/GuardClause
      break # The server was shutdown and no requests are pending
    elsif rc == -1
      break # Something went wrong
    elsif rc > 0
      ::Thread.current[:busy] = true
      process_request
      ::Thread.current[:busy] = false
    end
  end
ensure
  teardown
end

#running?Boolean

Returns:

  • (Boolean)


64
65
66
# File 'lib/protobuf/rpc/servers/zmq/worker.rb', line 64

def running?
  @broker.running? && @server.running?
end