Class: LogCourier::ServerZmq
- Inherits:
-
Object
- Object
- LogCourier::ServerZmq
- Defined in:
- lib/log-courier/server_zmq.rb
Overview
ZMQ transport implementation for the server
Defined Under Namespace
Classes: ZMQError
Instance Attribute Summary collapse
-
#port ⇒ Object
readonly
Returns the value of attribute port.
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize(options = {}) ⇒ ServerZmq
constructor
A new instance of ServerZmq.
- #run(&block) ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ ServerZmq
Returns a new instance of ServerZmq.
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/log-courier/server_zmq.rb', line 44 def initialize( = {}) = { logger: nil, transport: 'zmq', port: 0, address: '0.0.0.0', curve_secret_key: nil, max_packet_size: 10_485_760, peer_recv_queue: 10, }.merge!() @logger = [:logger] self.class.print_zmq_versions @logger if [:transport] == 'zmq' fail "input/courier: Transport 'zmq' requires libzmq version >= 4" unless LibZMQ.version4? fail 'input/courier: \'curve_secret_key\' is required' if [:curve_secret_key].nil? fail 'input/courier: \'curve_secret_key\' must be a valid 40 character Z85 encoded string' if [:curve_secret_key].length != 40 || !z85validate([:curve_secret_key]) end begin @context = ZMQ::Context.new # Router so we can send multiple responses @socket = @context.socket(ZMQ::ROUTER) if [:transport] == 'zmq' rc = @socket.setsockopt(ZMQ::CURVE_SERVER, 1) fail 'setsockopt CURVE_SERVER failure: ' + ZMQ::Util.error_string unless ZMQ::Util.resultcode_ok?(rc) rc = @socket.setsockopt(ZMQ::CURVE_SECRETKEY, [:curve_secret_key]) fail 'setsockopt CURVE_SECRETKEY failure: ' + ZMQ::Util.error_string unless ZMQ::Util.resultcode_ok?(rc) end bind = 'tcp://' + [:address] + ([:port] == 0 ? ':*' : ':' + [:port].to_s) rc = @socket.bind(bind) fail 'failed to bind at ' + bind + ': ' + rZMQ::Util.error_string unless ZMQ::Util.resultcode_ok?(rc) # Lookup port number that was allocated in case it was set to 0 endpoint = '' rc = @socket.getsockopt(ZMQ::LAST_ENDPOINT, endpoint) fail 'getsockopt LAST_ENDPOINT failure: ' + ZMQ::Util.error_string unless ZMQ::Util.resultcode_ok?(rc) && %r{\Atcp://(?:.*):(?<endpoint_port>\d+)\0\z} =~ endpoint @port = endpoint_port.to_i if [:port] == 0 @logger.warn 'Ephemeral port allocated', :transport => [:transport], :port => @port unless @logger.nil? end rescue => e raise "input/courier: Failed to initialise: #{e}" end # TODO: Implement workers option by receiving on a ROUTER and proxying to a DEALER, with workers connecting to the DEALER # TODO: Make this send queue configurable? @send_queue = EventQueue.new 2 @factory = ClientFactoryZmq.new(, @send_queue) # Setup poller @poller = ZMQPoll::ZMQPoll.new(@context) @poller.register_socket @socket, ZMQ::POLLIN @poller.register_queue_to_socket @send_queue, @socket # Register a finaliser that sets @context to nil # This allows us to detect the JRuby bug where during "exit!" finalisers # are run but threads are not killed - which leaves us in a situation of # a terminated @context (it has a terminate finalizer) and an IO thread # looping retries # JRuby will still crash and burn, but at least we don't spam STDOUT with # errors ObjectSpace.define_finalizer(self, Proc.new do @context = nil end) end |
Instance Attribute Details
#port ⇒ Object (readonly)
Returns the value of attribute port.
42 43 44 |
# File 'lib/log-courier/server_zmq.rb', line 42 def port @port end |
Class Method Details
.print_zmq_versions(logger) ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/log-courier/server_zmq.rb', line 28 def print_zmq_versions(logger) return if @print_zmq_versions || logger.nil? libversion = LibZMQ.version libversion = "#{libversion[:major]}.#{libversion[:minor]}.#{libversion[:patch]}" logger.info 'libzmq', :version => libversion logger.info 'ffi-rzmq-core', :version => LibZMQ::VERSION logger.info 'ffi-rzmq', :version => ZMQ.version @print_zmq_versions = true end |
Instance Method Details
#run(&block) ⇒ Object
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/log-courier/server_zmq.rb', line 120 def run(&block) loop do begin @poller.poll(5_000) do |socket, r, w| next if socket != @socket next if !r receive &block end rescue ZMQPoll::ZMQError => e # Detect JRuby bug fail e if @context.nil? @logger.warn e, :hint => 'ZMQ recv_string failure' unless @logger.nil? next rescue ZMQPoll::TimeoutError # We'll let ZeroMQ manage reconnections and new connections # There is no point in us doing any form of reconnect ourselves next end end return rescue ShutdownSignal # Shutting down @logger.warn 'Server shutting down' unless @logger.nil? return rescue StandardError, NativeException => e # Some other unknown problem @logger.warn e, :hint => 'Unknown error, shutting down' unless @logger.nil? raise e ensure @poller.shutdown @factory.shutdown @socket.close @context.terminate end |