Class: Hackle::EventProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/hackle/events/event_processor.rb

Defined Under Namespace

Classes: Message, MessageProcessor

Constant Summary collapse

DEFAULT_FLUSH_INTERVAL =
10

Instance Method Summary collapse

Constructor Details

#initialize(config:, event_dispatcher:) ⇒ EventProcessor

Returns a new instance of EventProcessor.

Parameters:



11
12
13
14
15
16
17
18
# File 'lib/hackle/events/event_processor.rb', line 11

def initialize(config:, event_dispatcher:)
  @logger = config.logger
  @event_dispatcher = event_dispatcher
  @message_processor = MessageProcessor.new(config: config, event_dispatcher: event_dispatcher)
  @flush_task = Concurrent::TimerTask.new(execution_interval: DEFAULT_FLUSH_INTERVAL) { flush }
  @consume_task = nil
  @running = false
end

Instance Method Details

#flushObject



46
47
48
# File 'lib/hackle/events/event_processor.rb', line 46

def flush
  @message_processor.produce(message: Message::Flush.new)
end

#process(event:) ⇒ Object

Parameters:



42
43
44
# File 'lib/hackle/events/event_processor.rb', line 42

def process(event:)
  @message_processor.produce(message: Message::Event.new(event))
end

#start!Object



20
21
22
23
24
25
26
# File 'lib/hackle/events/event_processor.rb', line 20

def start!
  return if @running

  @consume_task = Thread.new { @message_processor.consuming_loop }
  @flush_task.execute
  @running = true
end

#stop!Object



28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/hackle/events/event_processor.rb', line 28

def stop!
  return unless @running

  @logger.info { 'Shutting down Hackle event_processor' }

  @message_processor.produce(message: Message::Shutdown.new, non_block: false)
  @consume_task.join(10)
  @flush_task.shutdown
  @event_dispatcher.shutdown

  @running = false
end