Class: LogCourier::ServerZmq
- Inherits:
-
Object
- Object
- LogCourier::ServerZmq
- Defined in:
- lib/log-courier/server_zmq.rb
Overview
ZMQ transport implementation for the server
Defined Under Namespace
Classes: ZMQError
Instance Attribute Summary collapse
-
#port ⇒ Object
readonly
Returns the value of attribute port.
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize(options = {}) ⇒ ServerZmq
constructor
A new instance of ServerZmq.
- #run(&block) ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ ServerZmq
Returns a new instance of ServerZmq.
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/log-courier/server_zmq.rb', line 44 def initialize( = {}) @options = { logger: nil, transport: 'zmq', port: 0, address: '0.0.0.0', curve_secret_key: nil, max_packet_size: 10_485_760, peer_recv_queue: 10, }.merge!() @logger = @options[:logger] self.class.print_zmq_versions @logger if @options[:transport] == 'zmq' fail "input/courier: Transport 'zmq' requires libzmq version >= 4" unless LibZMQ.version4? fail 'input/courier: \'curve_secret_key\' is required' if @options[:curve_secret_key].nil? fail 'input/courier: \'curve_secret_key\' must be a valid 40 character Z85 encoded string' if @options[:curve_secret_key].length != 40 || !z85validate(@options[:curve_secret_key]) end begin @context = ZMQ::Context.new fail ZMQError, 'context creation error: ' + ZMQ::Util.error_string if @context.nil? # Router so we can send multiple responses @socket = @context.socket(ZMQ::ROUTER) fail ZMQError, 'socket creation error: ' + ZMQ::Util.error_string if @socket.nil? rc = @socket.setsockopt(ZMQ::LINGER, 0) fail ZMQError, 'setsockopt LINGER failure: ' + ZMQ::Util.error_string unless ZMQ::Util.resultcode_ok?(rc) if @options[:transport] == 'zmq' rc = @socket.setsockopt(ZMQ::CURVE_SERVER, 1) fail 'setsockopt CURVE_SERVER failure: ' + ZMQ::Util.error_string unless ZMQ::Util.resultcode_ok?(rc) rc = @socket.setsockopt(ZMQ::CURVE_SECRETKEY, @options[:curve_secret_key]) fail 'setsockopt CURVE_SECRETKEY failure: ' + ZMQ::Util.error_string unless ZMQ::Util.resultcode_ok?(rc) end bind = 'tcp://' + @options[:address] + (@options[:port] == 0 ? ':*' : ':' + @options[:port].to_s) rc = @socket.bind(bind) fail 'failed to bind at ' + bind + ': ' + ZMQ::Util.error_string unless ZMQ::Util.resultcode_ok?(rc) # Lookup port number that was allocated in case it was set to 0 endpoint = '' rc = @socket.getsockopt(ZMQ::LAST_ENDPOINT, endpoint) fail 'getsockopt LAST_ENDPOINT failure: ' + ZMQ::Util.error_string unless ZMQ::Util.resultcode_ok?(rc) && %r{\Atcp://(?:.*):(?<endpoint_port>\d+)\0\z} =~ endpoint @port = endpoint_port.to_i if @options[:port] == 0 @logger.warn 'Ephemeral port allocated', :transport => @options[:transport], :port => @port unless @logger.nil? end rescue => e raise "input/courier: Failed to initialise: #{e}" end # TODO: Implement workers option by receiving on a ROUTER and proxying to a DEALER, with workers connecting to the DEALER # TODO: Make this send queue configurable? @send_queue = EventQueue.new 2 @factory = ClientFactoryZmq.new(@options, @send_queue) # Setup poller @poller = ZMQPoll::ZMQPoll.new(@context) @poller.register_socket @socket, ZMQ::POLLIN @poller.register_queue_to_socket @send_queue, @socket end |
Instance Attribute Details
#port ⇒ Object (readonly)
Returns the value of attribute port.
42 43 44 |
# File 'lib/log-courier/server_zmq.rb', line 42 def port @port end |
Class Method Details
.print_zmq_versions(logger) ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/log-courier/server_zmq.rb', line 28 def print_zmq_versions(logger) return if @print_zmq_versions || logger.nil? libversion = LibZMQ.version libversion = "#{libversion[:major]}.#{libversion[:minor]}.#{libversion[:patch]}" logger.info 'libzmq', :version => libversion logger.info 'ffi-rzmq-core', :version => LibZMQ::VERSION logger.info 'ffi-rzmq', :version => ZMQ.version @print_zmq_versions = true end |
Instance Method Details
#run(&block) ⇒ Object
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/log-courier/server_zmq.rb', line 115 def run(&block) errors = 0 loop do begin @poller.poll(5_000) do |socket, r, w| next if socket != @socket next if !r receive &block end errors = 0 rescue ZMQPoll::ZMQError => e @logger.warn e, :hint => 'ZMQ poll failure' unless @logger.nil? next rescue ZMQPoll::ZMQTerm # Fall into shutdown signal, context was terminated # This can happen in JRuby - it seems to run finalisers too early fail ShutdownSignal rescue ZMQPoll::TimeoutError # We'll let ZeroMQ manage reconnections and new connections # There is no point in us doing any form of reconnect ourselves next end end return rescue ShutdownSignal # Shutting down @logger.warn 'Server shutting down' unless @logger.nil? return rescue StandardError, NativeException => e # Some other unknown problem @logger.warn e, :hint => 'Unknown error, shutting down' unless @logger.nil? return ensure @poller.shutdown @factory.shutdown @socket.close @context.terminate end |