Class: LogCourier::ServerZmq

Inherits:
Object
  • Object
show all
Defined in:
lib/log-courier/server_zmq.rb

Overview

ZMQ transport implementation for the server

Defined Under Namespace

Classes: ZMQError

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ ServerZmq

Returns a new instance of ServerZmq.



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/log-courier/server_zmq.rb', line 44

def initialize(options = {})
  @options = {
    logger:           nil,
    transport:        'zmq',
    port:             0,
    address:          '0.0.0.0',
    curve_secret_key: nil,
    max_packet_size:  10_485_760,
    peer_recv_queue:  10,
  }.merge!(options)

  @logger = @options[:logger]

  self.class.print_zmq_versions @logger

  if @options[:transport] == 'zmq'
    fail "input/courier: Transport 'zmq' requires libzmq version >= 4" unless LibZMQ.version4?

    fail 'input/courier: \'curve_secret_key\' is required' if @options[:curve_secret_key].nil?

    fail 'input/courier: \'curve_secret_key\' must be a valid 40 character Z85 encoded string' if @options[:curve_secret_key].length != 40 || !z85validate(@options[:curve_secret_key])
  end

  begin
    @context = ZMQ::Context.new
    fail ZMQError, 'context creation error: ' + ZMQ::Util.error_string if @context.nil?

    # Router so we can send multiple responses
    @socket = @context.socket(ZMQ::ROUTER)
    fail ZMQError, 'socket creation error: ' + ZMQ::Util.error_string if @socket.nil?

    rc = @socket.setsockopt(ZMQ::LINGER, 0)
    fail ZMQError, 'setsockopt LINGER failure: ' + ZMQ::Util.error_string unless ZMQ::Util.resultcode_ok?(rc)

    if @options[:transport] == 'zmq'
      rc = @socket.setsockopt(ZMQ::CURVE_SERVER, 1)
      fail 'setsockopt CURVE_SERVER failure: ' + ZMQ::Util.error_string unless ZMQ::Util.resultcode_ok?(rc)

      rc = @socket.setsockopt(ZMQ::CURVE_SECRETKEY, @options[:curve_secret_key])
      fail 'setsockopt CURVE_SECRETKEY failure: ' + ZMQ::Util.error_string unless ZMQ::Util.resultcode_ok?(rc)
    end

    bind = 'tcp://' + @options[:address] + (@options[:port] == 0 ? ':*' : ':' + @options[:port].to_s)
    rc = @socket.bind(bind)
    fail 'failed to bind at ' + bind + ': ' + ZMQ::Util.error_string unless ZMQ::Util.resultcode_ok?(rc)

    # Lookup port number that was allocated in case it was set to 0
    endpoint = ''
    rc = @socket.getsockopt(ZMQ::LAST_ENDPOINT, endpoint)
    fail 'getsockopt LAST_ENDPOINT failure: ' + ZMQ::Util.error_string unless ZMQ::Util.resultcode_ok?(rc) && %r{\Atcp://(?:.*):(?<endpoint_port>\d+)\0\z} =~ endpoint
    @port = endpoint_port.to_i

    if @options[:port] == 0
      @logger.warn 'Ephemeral port allocated', :transport => @options[:transport], :port => @port unless @logger.nil?
    end
  rescue => e
    raise "input/courier: Failed to initialise: #{e}"
  end

  # TODO: Implement workers option by receiving on a ROUTER and proxying to a DEALER, with workers connecting to the DEALER

  # TODO: Make this send queue configurable?
  @send_queue = EventQueue.new 2
  @factory = ClientFactoryZmq.new(@options, @send_queue)

  # Setup poller
  @poller = ZMQPoll::ZMQPoll.new(@context)
  @poller.register_socket @socket, ZMQ::POLLIN
  @poller.register_queue_to_socket @send_queue, @socket
end

Instance Attribute Details

#portObject (readonly)

Returns the value of attribute port.



42
43
44
# File 'lib/log-courier/server_zmq.rb', line 42

def port
  @port
end

Class Method Details



28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/log-courier/server_zmq.rb', line 28

def print_zmq_versions(logger)
  return if @print_zmq_versions || logger.nil?

  libversion = LibZMQ.version
  libversion = "#{libversion[:major]}.#{libversion[:minor]}.#{libversion[:patch]}"

  logger.info 'libzmq', :version => libversion
  logger.info 'ffi-rzmq-core', :version => LibZMQ::VERSION
  logger.info 'ffi-rzmq', :version => ZMQ.version

  @print_zmq_versions = true
end

Instance Method Details

#run(&block) ⇒ Object



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/log-courier/server_zmq.rb', line 115

def run(&block)
  errors = 0
  loop do
    begin
      @poller.poll(5_000) do |socket, r, w|
        next if socket != @socket
        next if !r

        receive &block
      end
      errors = 0
    rescue ZMQPoll::ZMQError => e
      @logger.warn e, :hint => 'ZMQ poll failure' unless @logger.nil?
      next
    rescue ZMQPoll::ZMQTerm
      # Fall into shutdown signal, context was terminated
      # This can happen in JRuby - it seems to run finalisers too early
      fail ShutdownSignal
    rescue ZMQPoll::TimeoutError
      # We'll let ZeroMQ manage reconnections and new connections
      # There is no point in us doing any form of reconnect ourselves
      next
    end
  end
  return
rescue ShutdownSignal
  # Shutting down
  @logger.warn 'Server shutting down' unless @logger.nil?
  return
rescue StandardError, NativeException => e
  # Some other unknown problem
  @logger.warn e, :hint => 'Unknown error, shutting down' unless @logger.nil?
  return
ensure
  @poller.shutdown
  @factory.shutdown
  @socket.close
  @context.terminate
end