Class: LogCourier::ClientZmq
Instance Attribute Summary
Attributes inherited from EventQueue
#max
Instance Method Summary
collapse
Methods inherited from EventQueue
#clear, #empty?, #length, #num_waiting, #pop, #push
Constructor Details
#initialize(factory, source, source_str, &try_drop) ⇒ ClientZmq
Returns a new instance of ClientZmq.
327
328
329
330
331
332
333
334
335
336
337
|
# File 'lib/log-courier/server_zmq.rb', line 327
def initialize(factory, source, source_str, &try_drop)
@factory = factory
@logger = @factory.options[:logger]
@send_queue = @factory.send_queue
@source = source
@source_str = source_str
@try_drop = try_drop
super @factory.options[:peer_recv_queue]
end
|
Instance Method Details
#add_fields(event) ⇒ Object
368
369
|
# File 'lib/log-courier/server_zmq.rb', line 368
def add_fields(event)
end
|
#run(&block) ⇒ Object
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
|
# File 'lib/log-courier/server_zmq.rb', line 339
def run(&block)
loop do
begin
data = self.pop(30)
recv(data, &block)
rescue TimeoutError
retry if !@try_drop.call(@source)
break
end
end
return
rescue ShutdownSignal
@logger.info 'Source shutting down', :source => @source_str unless @logger.nil?
return
rescue StandardError, NativeException => e
@logger.warn e, :hint => 'Unknown error, connection aborted', :source => @source_str unless @logger.nil?
raise e
end
|
#send(signature, message) ⇒ Object
362
363
364
365
366
|
# File 'lib/log-courier/server_zmq.rb', line 362
def send(signature, message)
data = signature + [message.length].pack('N') + message
@send_queue.push @source + ['', data]
return
end
|