Class: Hackle::EventProcessor::MessageProcessor
- Inherits:
-
Object
- Object
- Hackle::EventProcessor::MessageProcessor
- Defined in:
- lib/hackle/events/event_processor.rb
Constant Summary collapse
- DEFAULT_MESSAGE_QUEUE_CAPACITY =
1000- DEFAULT_MAX_EVENT_DISPATCH_SIZE =
500
Instance Method Summary collapse
- #consuming_loop ⇒ Object
-
#initialize(config:, event_dispatcher:) ⇒ MessageProcessor
constructor
A new instance of MessageProcessor.
- #produce(message:, non_block: true) ⇒ Object
Constructor Details
#initialize(config:, event_dispatcher:) ⇒ MessageProcessor
Returns a new instance of MessageProcessor.
74 75 76 77 78 79 80 |
# File 'lib/hackle/events/event_processor.rb', line 74 def initialize(config:, event_dispatcher:) @logger = config.logger @event_dispatcher = event_dispatcher = SizedQueue.new(DEFAULT_MESSAGE_QUEUE_CAPACITY) @random = Random.new @consumed_events = [] end |
Instance Method Details
#consuming_loop ⇒ Object
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/hackle/events/event_processor.rb', line 92 def consuming_loop loop do = .pop case when Message::Event consume_event(event: .event) when Message::Flush dispatch_events when Message::Shutdown break end end rescue => e @logger.warn { "Uncaught exception in events message processor: #{e.inspect}" } ensure dispatch_events end |
#produce(message:, non_block: true) ⇒ Object
84 85 86 87 88 89 90 |
# File 'lib/hackle/events/event_processor.rb', line 84 def produce(message:, non_block: true) .push(, non_block) rescue ThreadError if @random.rand(1..100) == 1 # log only 1% of the time @logger.warn { 'Events are produced faster than can be consumed. Some events will be dropped.' } end end |