Class: Hackle::EventProcessor
- Inherits:
-
Object
- Object
- Hackle::EventProcessor
show all
- Defined in:
- lib/hackle/events/event_processor.rb
Defined Under Namespace
Classes: Message, MessageProcessor
Constant Summary
collapse
- DEFAULT_FLUSH_INTERVAL =
10
Instance Method Summary
collapse
Constructor Details
#initialize(config:, event_dispatcher:) ⇒ EventProcessor
Returns a new instance of EventProcessor.
11
12
13
14
15
16
17
18
|
# File 'lib/hackle/events/event_processor.rb', line 11
def initialize(config:, event_dispatcher:)
@logger = config.logger
@event_dispatcher = event_dispatcher
@message_processor = MessageProcessor.new(config: config, event_dispatcher: event_dispatcher)
@flush_task = Concurrent::TimerTask.new(execution_interval: DEFAULT_FLUSH_INTERVAL) { flush }
@consume_task = nil
@running = false
end
|
Instance Method Details
#flush ⇒ Object
46
47
48
|
# File 'lib/hackle/events/event_processor.rb', line 46
def flush
@message_processor.produce(message: Message::Flush.new)
end
|
#process(event:) ⇒ Object
42
43
44
|
# File 'lib/hackle/events/event_processor.rb', line 42
def process(event:)
@message_processor.produce(message: Message::Event.new(event))
end
|
#start! ⇒ Object
20
21
22
23
24
25
26
|
# File 'lib/hackle/events/event_processor.rb', line 20
def start!
return if @running
@consume_task = Thread.new { @message_processor.consuming_loop }
@flush_task.execute
@running = true
end
|
#stop! ⇒ Object
28
29
30
31
32
33
34
35
36
37
38
39
|
# File 'lib/hackle/events/event_processor.rb', line 28
def stop!
return unless @running
@logger.info { 'Shutting down Hackle event_processor' }
@message_processor.produce(message: Message::Shutdown.new, non_block: false)
@consume_task.join(10)
@flush_task.shutdown
@event_dispatcher.shutdown
@running = false
end
|