Class: EventSourcery::EventProcessing::ESPRunner

Inherits:
Object
  • Object
show all
Defined in:
lib/event_sourcery/event_processing/esp_runner.rb

Overview

NOTE: databases should be disconnected before running this EventSourcery.config.postgres.event_store_database.disconnect

Instance Method Summary collapse

Constructor Details

#initialize(event_processors:, event_source:, max_seconds_for_processes_to_terminate: 30, shutdown_requested: false, after_fork: nil, after_subprocess_termination: nil, logger: EventSourcery.logger) ⇒ ESPRunner

Returns a new instance of ESPRunner.



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# File 'lib/event_sourcery/event_processing/esp_runner.rb', line 6

def initialize(event_processors:,
               event_source:,
               max_seconds_for_processes_to_terminate: 30,
               shutdown_requested: false,
               after_fork: nil,
               after_subprocess_termination: nil,
               logger: EventSourcery.logger)
  @event_processors = event_processors
  @event_source = event_source
  @pids = {}
  @max_seconds_for_processes_to_terminate = max_seconds_for_processes_to_terminate
  @shutdown_requested = shutdown_requested
  @exit_status = true
  @after_fork = after_fork
  @after_subprocess_termination = after_subprocess_termination
  @logger = logger
end

Instance Method Details

#shutdownObject



52
53
54
# File 'lib/event_sourcery/event_processing/esp_runner.rb', line 52

def shutdown
  @shutdown_requested = true
end

#shutdown_requested?Boolean

Returns:

  • (Boolean)


56
57
58
# File 'lib/event_sourcery/event_processing/esp_runner.rb', line 56

def shutdown_requested?
  @shutdown_requested
end

#start!Object

Start each Event Stream Processor in a new child process.



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/event_sourcery/event_processing/esp_runner.rb', line 25

def start!
  with_logging do
    start_processes
    listen_for_shutdown_signals
    while_waiting_for_shutdown do
      record_terminated_processes
    end
    terminate_remaining_processes
    until all_processes_terminated? || waited_long_enough?
      record_terminated_processes
    end
    kill_remaining_processes
    record_terminated_processes until all_processes_terminated?
  end
  exit_indicating_status_of_processes
end

#start_processor(event_processor) ⇒ Object



42
43
44
45
46
47
48
49
50
# File 'lib/event_sourcery/event_processing/esp_runner.rb', line 42

def start_processor(event_processor)
  process = ESPProcess.new(
    event_processor: event_processor,
    event_source: @event_source,
    after_fork: @after_fork,
  )
  pid = Process.fork { process.start }
  @pids[pid] = event_processor
end