Class: Hackle::EventProcessor::MessageProcessor
- Inherits:
-
Object
- Object
- Hackle::EventProcessor::MessageProcessor
- Defined in:
- lib/hackle-ruby-sdk/events/event_processor.rb
Constant Summary collapse
- DEFAULT_MESSAGE_QUEUE_CAPACITY =
1000- DEFAULT_MAX_EVENT_DISPATCH_SIZE =
500
Instance Method Summary collapse
- #consuming_loop ⇒ Object
-
#initialize(event_dispatcher, config) ⇒ MessageProcessor
constructor
A new instance of MessageProcessor.
- #produce(message, non_block: true) ⇒ Object
Constructor Details
#initialize(event_dispatcher, config) ⇒ MessageProcessor
Returns a new instance of MessageProcessor.
66 67 68 69 70 71 72 |
# File 'lib/hackle-ruby-sdk/events/event_processor.rb', line 66 def initialize(event_dispatcher, config) @logger = config.logger @event_dispatcher = event_dispatcher = SizedQueue.new(DEFAULT_MESSAGE_QUEUE_CAPACITY) @random = Random.new @consumed_events = [] end |
Instance Method Details
#consuming_loop ⇒ Object
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/hackle-ruby-sdk/events/event_processor.rb', line 82 def consuming_loop loop do = .pop case when Message::Event consume_event(.event) when Message::Flush dispatch_events when Message::Shutdown break end end rescue => e @logger.warn { "Uncaught exception in events message processor: #{e.inspect}" } ensure dispatch_events end |
#produce(message, non_block: true) ⇒ Object
74 75 76 77 78 79 80 |
# File 'lib/hackle-ruby-sdk/events/event_processor.rb', line 74 def produce(, non_block: true) .push(, non_block) rescue ThreadError if @random.rand(1..100) == 1 # log only 1% of the time @logger.warn { 'Events are produced faster than can be consumed. Some events will be dropped.' } end end |