Class: Protobuf::Rpc::Zmq::Worker

Inherits:
Object
  • Object
show all
Includes:
Server, Util
Defined in:
lib/protobuf/rpc/servers/zmq/worker.rb

Instance Method Summary collapse

Methods included from Util

#log_signature, #resolve_ip, #zmq_error_check

Methods included from Logger::LogMethods

included, #log_exception, #log_signature, #sign_message

Methods included from Server

#gc_pause, #handle_request, #log_signature

Constructor Details

#initialize(server) ⇒ Worker

Constructor



15
16
17
18
19
20
21
22
23
# File 'lib/protobuf/rpc/servers/zmq/worker.rb', line 15

def initialize(server)
  @server = server

  init_zmq_context
  init_backend_socket
rescue
  teardown
  raise
end

Instance Method Details

#process_requestObject

Instance Methods



28
29
30
31
32
33
34
35
36
# File 'lib/protobuf/rpc/servers/zmq/worker.rb', line 28

def process_request
  client_address, _, data = read_from_backend
  return unless data

  gc_pause do
    encoded_response = handle_request(data)
    write_to_backend([client_address, "", encoded_response])
  end
end

#runObject



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/protobuf/rpc/servers/zmq/worker.rb', line 38

def run
  poller = ::ZMQ::Poller.new
  poller.register_readable(@backend_socket)
  poller.register_readable(@shutdown_socket)

  # Send request to broker telling it we are ready
  write_to_backend([::Protobuf::Rpc::Zmq::WORKER_READY_MESSAGE])

  loop do
    rc = poller.poll(500)

    # The server was shutdown and no requests are pending
    break if rc == 0 && !running?

    # Something went wrong
    break if rc == -1

    if rc > 0
      ::Thread.current[:busy] = true
      process_request
      ::Thread.current[:busy] = false
    end
  end
ensure
  teardown
end

#running?Boolean

Returns:

  • (Boolean)


65
66
67
# File 'lib/protobuf/rpc/servers/zmq/worker.rb', line 65

def running?
  @server.running?
end