Class: EventSourcery::EventProcessing::ESPRunner
- Inherits:
-
Object
- Object
- EventSourcery::EventProcessing::ESPRunner
- Defined in:
- lib/event_sourcery/event_processing/esp_runner.rb
Overview
NOTE: databases should be disconnected before running this EventSourcery.config.postgres.event_store_database.disconnect
Instance Method Summary collapse
-
#initialize(event_processors:, event_source:, max_seconds_for_processes_to_terminate: 30, shutdown_requested: false, after_fork: nil, after_subprocess_termination: nil, logger: EventSourcery.logger) ⇒ ESPRunner
constructor
A new instance of ESPRunner.
- #shutdown ⇒ Object
- #shutdown_requested? ⇒ Boolean
-
#start! ⇒ Object
Start each Event Stream Processor in a new child process.
- #start_processor(event_processor) ⇒ Object
Constructor Details
#initialize(event_processors:, event_source:, max_seconds_for_processes_to_terminate: 30, shutdown_requested: false, after_fork: nil, after_subprocess_termination: nil, logger: EventSourcery.logger) ⇒ ESPRunner
Returns a new instance of ESPRunner.
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
# File 'lib/event_sourcery/event_processing/esp_runner.rb', line 6 def initialize(event_processors:, event_source:, max_seconds_for_processes_to_terminate: 30, shutdown_requested: false, after_fork: nil, after_subprocess_termination: nil, logger: EventSourcery.logger) @event_processors = event_processors @event_source = event_source @pids = {} @max_seconds_for_processes_to_terminate = max_seconds_for_processes_to_terminate @shutdown_requested = shutdown_requested @exit_status = true @after_fork = after_fork @after_subprocess_termination = after_subprocess_termination @logger = logger end |
Instance Method Details
#shutdown ⇒ Object
52 53 54 |
# File 'lib/event_sourcery/event_processing/esp_runner.rb', line 52 def shutdown @shutdown_requested = true end |
#shutdown_requested? ⇒ Boolean
56 57 58 |
# File 'lib/event_sourcery/event_processing/esp_runner.rb', line 56 def shutdown_requested? @shutdown_requested end |
#start! ⇒ Object
Start each Event Stream Processor in a new child process.
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/event_sourcery/event_processing/esp_runner.rb', line 25 def start! with_logging do start_processes listen_for_shutdown_signals while_waiting_for_shutdown do record_terminated_processes end terminate_remaining_processes until all_processes_terminated? || waited_long_enough? record_terminated_processes end kill_remaining_processes record_terminated_processes until all_processes_terminated? end exit_indicating_status_of_processes end |
#start_processor(event_processor) ⇒ Object
42 43 44 45 46 47 48 49 50 |
# File 'lib/event_sourcery/event_processing/esp_runner.rb', line 42 def start_processor(event_processor) process = ESPProcess.new( event_processor: event_processor, event_source: @event_source, after_fork: @after_fork, ) pid = Process.fork { process.start } @pids[pid] = event_processor end |