Class: LogCourier::ServerZmq

Inherits:
Object
  • Object
show all
Defined in:
lib/log-courier/server_zmq.rb

Overview

ZMQ transport implementation for the server

Defined Under Namespace

Classes: ZMQError

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ ServerZmq

Returns a new instance of ServerZmq.



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/log-courier/server_zmq.rb', line 44

def initialize(options = {})
  @options = {
    logger:           nil,
    transport:        'zmq',
    port:             0,
    address:          '0.0.0.0',
    curve_secret_key: nil,
    max_packet_size:  10_485_760,
    peer_recv_queue:  10,
  }.merge!(options)

  @logger = @options[:logger]

  self.class.print_zmq_versions @logger

  if @options[:transport] == 'zmq'
    fail "input/courier: Transport 'zmq' requires libzmq version >= 4" unless LibZMQ.version4?

    fail 'input/courier: \'curve_secret_key\' is required' if @options[:curve_secret_key].nil?

    fail 'input/courier: \'curve_secret_key\' must be a valid 40 character Z85 encoded string' if @options[:curve_secret_key].length != 40 || !z85validate(@options[:curve_secret_key])
  end

  begin
    @context = ZMQ::Context.new
    # Router so we can send multiple responses
    @socket = @context.socket(ZMQ::ROUTER)

    if @options[:transport] == 'zmq'
      rc = @socket.setsockopt(ZMQ::CURVE_SERVER, 1)
      fail 'setsockopt CURVE_SERVER failure: ' + ZMQ::Util.error_string unless ZMQ::Util.resultcode_ok?(rc)

      rc = @socket.setsockopt(ZMQ::CURVE_SECRETKEY, @options[:curve_secret_key])
      fail 'setsockopt CURVE_SECRETKEY failure: ' + ZMQ::Util.error_string unless ZMQ::Util.resultcode_ok?(rc)
    end

    bind = 'tcp://' + @options[:address] + (@options[:port] == 0 ? ':*' : ':' + @options[:port].to_s)
    rc = @socket.bind(bind)
    fail 'failed to bind at ' + bind + ': ' + rZMQ::Util.error_string unless ZMQ::Util.resultcode_ok?(rc)

    # Lookup port number that was allocated in case it was set to 0
    endpoint = ''
    rc = @socket.getsockopt(ZMQ::LAST_ENDPOINT, endpoint)
    fail 'getsockopt LAST_ENDPOINT failure: ' + ZMQ::Util.error_string unless ZMQ::Util.resultcode_ok?(rc) && %r{\Atcp://(?:.*):(?<endpoint_port>\d+)\0\z} =~ endpoint
    @port = endpoint_port.to_i

    if @options[:port] == 0
      @logger.warn 'Ephemeral port allocated', :transport => @options[:transport], :port => @port unless @logger.nil?
    end
  rescue => e
    raise "input/courier: Failed to initialise: #{e}"
  end

  # TODO: Implement workers option by receiving on a ROUTER and proxying to a DEALER, with workers connecting to the DEALER

  # TODO: Make this send queue configurable?
  @send_queue = EventQueue.new 2
  @factory = ClientFactoryZmq.new(@options, @send_queue)

  # Setup poller
  @poller = ZMQPoll::ZMQPoll.new(@context)
  @poller.register_socket @socket, ZMQ::POLLIN
  @poller.register_queue_to_socket @send_queue, @socket

  # Register a finaliser that sets @context to nil
  # This allows us to detect the JRuby bug where during "exit!" finalisers
  # are run but threads are not killed - which leaves us in a situation of
  # a terminated @context (it has a terminate finalizer) and an IO thread
  # looping retries
  # JRuby will still crash and burn, but at least we don't spam STDOUT with
  # errors
  ObjectSpace.define_finalizer(self, Proc.new do
    @context = nil
  end)
end

Instance Attribute Details

#portObject (readonly)

Returns the value of attribute port.



42
43
44
# File 'lib/log-courier/server_zmq.rb', line 42

def port
  @port
end

Class Method Details



28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/log-courier/server_zmq.rb', line 28

def print_zmq_versions(logger)
  return if @print_zmq_versions || logger.nil?

  libversion = LibZMQ.version
  libversion = "#{libversion[:major]}.#{libversion[:minor]}.#{libversion[:patch]}"

  logger.info 'libzmq', :version => libversion
  logger.info 'ffi-rzmq-core', :version => LibZMQ::VERSION
  logger.info 'ffi-rzmq', :version => ZMQ.version

  @print_zmq_versions = true
end

Instance Method Details

#run(&block) ⇒ Object



120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/log-courier/server_zmq.rb', line 120

def run(&block)
  loop do
    begin
      @poller.poll(5_000) do |socket, r, w|
        next if socket != @socket
        next if !r

        receive &block
      end
    rescue ZMQPoll::ZMQError => e
      # Detect JRuby bug
      fail e if @context.nil?
      @logger.warn e, :hint => 'ZMQ recv_string failure' unless @logger.nil?
      next
    rescue ZMQPoll::TimeoutError
      # We'll let ZeroMQ manage reconnections and new connections
      # There is no point in us doing any form of reconnect ourselves
      next
    end
  end
  return
rescue ShutdownSignal
  # Shutting down
  @logger.warn 'Server shutting down' unless @logger.nil?
  return
rescue StandardError, NativeException => e
  # Some other unknown problem
  @logger.warn e, :hint => 'Unknown error, shutting down' unless @logger.nil?
  raise e
ensure
  @poller.shutdown
  @factory.shutdown
  @socket.close
  @context.terminate
end