Class: LogCourier::ClientZmq
- Inherits:
-
EventQueue
- Object
- EventQueue
- LogCourier::ClientZmq
- Defined in:
- lib/log-courier/server_zmq.rb
Instance Attribute Summary
Attributes inherited from EventQueue
Instance Method Summary collapse
-
#initialize(factory, source, source_str, &try_drop) ⇒ ClientZmq
constructor
A new instance of ClientZmq.
- #run(&block) ⇒ Object
- #send(signature, message) ⇒ Object
Methods inherited from EventQueue
#clear, #empty?, #length, #num_waiting, #pop, #push
Constructor Details
#initialize(factory, source, source_str, &try_drop) ⇒ ClientZmq
Returns a new instance of ClientZmq.
328 329 330 331 332 333 334 335 336 337 338 |
# File 'lib/log-courier/server_zmq.rb', line 328 def initialize(factory, source, source_str, &try_drop) @factory = factory @logger = @factory.[:logger] @send_queue = @factory.send_queue @source = source @source_str = source_str @try_drop = try_drop # Setup the queue for receiving events to process super @factory.[:peer_recv_queue] end |
Instance Method Details
#run(&block) ⇒ Object
340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 |
# File 'lib/log-courier/server_zmq.rb', line 340 def run(&block) loop do begin # TODO: Make timeout configurable? data = self.pop(30) recv(data, &block) rescue TimeoutError # Try to clean up resources - if we fail, new messages have arrived retry if !@try_drop.call(@source) break end end return rescue ShutdownSignal # Shutting down @logger.info 'Source shutting down', :source => @source_str unless @logger.nil? return rescue StandardError, NativeException => e # Some other unknown problem @logger.warn e, :hint => 'Unknown error, connection aborted', :source => @source_str unless @logger.nil? raise e end |
#send(signature, message) ⇒ Object
363 364 365 366 367 |
# File 'lib/log-courier/server_zmq.rb', line 363 def send(signature, ) data = signature + [.length].pack('N') + @send_queue.push @source + ['', data] return end |