Class: Protobuf::Rpc::Zmq::Worker
- Inherits:
-
Object
- Object
- Protobuf::Rpc::Zmq::Worker
- Defined in:
- lib/protobuf/rpc/servers/zmq/worker.rb
Instance Method Summary collapse
-
#initialize(server, broker) ⇒ Worker
constructor
Constructor.
-
#process_request ⇒ Object
Instance Methods.
- #run ⇒ Object
- #running? ⇒ Boolean
Methods included from Util
#log_signature, #resolve_ip, #zmq_error_check
Methods included from Logging
initialize_logger, #log_exception, #log_signature, #logger, #sign_message
Methods included from Server
#gc_pause, #handle_request, #log_signature
Constructor Details
#initialize(server, broker) ⇒ Worker
Constructor
15 16 17 18 19 20 21 22 23 24 |
# File 'lib/protobuf/rpc/servers/zmq/worker.rb', line 15 def initialize(server, broker) @server = server @broker = broker init_zmq_context init_backend_socket rescue teardown raise end |
Instance Method Details
#process_request ⇒ Object
Instance Methods
29 30 31 32 33 34 35 36 37 |
# File 'lib/protobuf/rpc/servers/zmq/worker.rb', line 29 def process_request client_address, _, data = read_from_backend return unless data gc_pause do encoded_response = handle_request(data) write_to_backend([client_address, ::Protobuf::Rpc::Zmq::EMPTY_STRING, encoded_response]) end end |
#run ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/protobuf/rpc/servers/zmq/worker.rb', line 39 def run poller = ::ZMQ::Poller.new poller.register_readable(@backend_socket) poller.register_readable(@shutdown_socket) # Send request to broker telling it we are ready write_to_backend([::Protobuf::Rpc::Zmq::WORKER_READY_MESSAGE]) loop do rc = poller.poll(500) if rc == 0 && !running? # rubocop:disable Style/GuardClause break # The server was shutdown and no requests are pending elsif rc == -1 break # Something went wrong elsif rc > 0 ::Thread.current[:busy] = true process_request ::Thread.current[:busy] = false end end ensure teardown end |
#running? ⇒ Boolean
64 65 66 |
# File 'lib/protobuf/rpc/servers/zmq/worker.rb', line 64 def running? @broker.running? && @server.running? end |