Class: LogCourier::ClientZmq

Inherits:
EventQueue show all
Defined in:
lib/log-courier/server_zmq.rb

Instance Attribute Summary

Attributes inherited from EventQueue

#max

Instance Method Summary collapse

Methods inherited from EventQueue

#clear, #empty?, #length, #num_waiting, #pop, #push

Constructor Details

#initialize(factory, source, source_str, &try_drop) ⇒ ClientZmq

Returns a new instance of ClientZmq.



331
332
333
334
335
336
337
338
339
340
341
# File 'lib/log-courier/server_zmq.rb', line 331

def initialize(factory, source, source_str, &try_drop)
  @factory = factory
  @logger = @factory.options[:logger]
  @send_queue = @factory.send_queue
  @source = source
  @source_str = source_str
  @try_drop = try_drop

  # Setup the queue for receiving events to process
  super @factory.options[:peer_recv_queue]
end

Instance Method Details

#add_fields(event) ⇒ Object



372
373
# File 'lib/log-courier/server_zmq.rb', line 372

def add_fields(event)
end

#run(&block) ⇒ Object



343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
# File 'lib/log-courier/server_zmq.rb', line 343

def run(&block)
  loop do
    begin
      # TODO: Make timeout configurable?
      data = self.pop(30)
      recv(data, &block)
    rescue TimeoutError
      # Try to clean up resources - if we fail, new messages have arrived
      retry if !@try_drop.call(@source)
      break
    end
  end
  return
rescue ShutdownSignal
  # Shutting down
  @logger.info 'Source shutting down', :source => @source_str unless @logger.nil?
  return
rescue StandardError, NativeException => e
  # Some other unknown problem
  @logger.warn e, :hint => 'Unknown error, connection aborted', :source => @source_str unless @logger.nil?
  raise e
end

#send(signature, message) ⇒ Object



366
367
368
369
370
# File 'lib/log-courier/server_zmq.rb', line 366

def send(signature, message)
  data = signature + [message.length].pack('N') + message
  @send_queue.push @source + ['', data]
  return
end