Class: Whodunit::Chronicles::Service
- Inherits:
-
Object
- Object
- Whodunit::Chronicles::Service
- Defined in:
- lib/whodunit/chronicles/service.rb
Overview
Main service orchestrator for chronicle streaming
Coordinates the stream adapter and processor to provide a complete chronicle streaming solution with error handling and monitoring.
Instance Attribute Summary collapse
-
#adapter ⇒ Object
readonly
Returns the value of attribute adapter.
-
#executor ⇒ Object
readonly
Returns the value of attribute executor.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#processor ⇒ Object
readonly
Returns the value of attribute processor.
Instance Method Summary collapse
- #build_adapter ⇒ Object private
- #handle_streaming_error(error) ⇒ Object private
-
#initialize(adapter: nil, processor: nil, logger: Chronicles.logger) ⇒ Service
constructor
A new instance of Service.
- #log(level, message, context = {}) ⇒ Object private
- #process_change_event(change_event) ⇒ Object private
-
#running? ⇒ Boolean
Check if service is running.
-
#setup! ⇒ void
Set up the chronicle streaming infrastructure.
- #should_chronicle_table?(change_event) ⇒ Boolean private
- #should_retry? ⇒ Boolean private
-
#start ⇒ self
Start the chronicle streaming service.
- #start_streaming_with_retry ⇒ Object private
-
#status ⇒ Hash
Get service status information.
-
#stop ⇒ void
Stop the chronicle streaming service.
-
#teardown! ⇒ void
Tear down the chronicle streaming infrastructure.
- #test_connections! ⇒ Object private
- #validate_setup! ⇒ Object private
Constructor Details
#initialize(adapter: nil, processor: nil, logger: Chronicles.logger) ⇒ Service
Returns a new instance of Service.
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/whodunit/chronicles/service.rb', line 14 def initialize( adapter: nil, processor: nil, logger: Chronicles.logger ) @adapter = adapter || build_adapter @processor = processor || Processor.new(logger: logger) @logger = logger @executor = Concurrent::ThreadPoolExecutor.new( min_threads: 1, max_threads: 4, max_queue: 100, fallback_policy: :caller_runs, ) @running = false @retry_count = 0 end |
Instance Attribute Details
#adapter ⇒ Object (readonly)
Returns the value of attribute adapter.
12 13 14 |
# File 'lib/whodunit/chronicles/service.rb', line 12 def adapter @adapter end |
#executor ⇒ Object (readonly)
Returns the value of attribute executor.
12 13 14 |
# File 'lib/whodunit/chronicles/service.rb', line 12 def executor @executor end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
12 13 14 |
# File 'lib/whodunit/chronicles/service.rb', line 12 def logger @logger end |
#processor ⇒ Object (readonly)
Returns the value of attribute processor.
12 13 14 |
# File 'lib/whodunit/chronicles/service.rb', line 12 def processor @processor end |
Instance Method Details
#build_adapter ⇒ Object (private)
118 119 120 121 122 123 124 125 126 127 |
# File 'lib/whodunit/chronicles/service.rb', line 118 def build_adapter case Chronicles.config.adapter when :postgresql Adapters::PostgreSQL.new(logger: logger) when :mysql Adapters::MySQL.new(logger: logger) else raise ConfigurationError, "Unsupported adapter: #{Chronicles.config.adapter}" end end |
#handle_streaming_error(error) ⇒ Object (private)
186 187 188 189 190 191 192 193 194 195 196 |
# File 'lib/whodunit/chronicles/service.rb', line 186 def handle_streaming_error(error) @retry_count += 1 log(:error, 'Streaming error occurred', error: error., retry_count: @retry_count, max_retries: Chronicles.config.max_retry_attempts ) # Wait before retry sleep(Chronicles.config.retry_delay) if should_retry? end |
#log(level, message, context = {}) ⇒ Object (private)
202 203 204 |
# File 'lib/whodunit/chronicles/service.rb', line 202 def log(level, , context = {}) logger.public_send(level, , service: 'Chronicles::Service', **context) end |
#process_change_event(change_event) ⇒ Object (private)
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
# File 'lib/whodunit/chronicles/service.rb', line 162 def process_change_event(change_event) return unless change_event return unless should_chronicle_table?(change_event) log(:debug, 'Processing change event', table: change_event.qualified_table_name, action: change_event.action ) processor.process(change_event) rescue StandardError => e log(:error, 'Failed to process change event', error: e., event: change_event.to_s ) end |
#running? ⇒ Boolean
Check if service is running
76 77 78 |
# File 'lib/whodunit/chronicles/service.rb', line 76 def running? @running end |
#setup! ⇒ void
This method returns an undefined value.
Set up the chronicle streaming infrastructure
100 101 102 103 104 |
# File 'lib/whodunit/chronicles/service.rb', line 100 def setup! log(:info, 'Setting up chronicle streaming infrastructure') adapter.setup log(:info, 'Chronicle streaming infrastructure setup completed') end |
#should_chronicle_table?(change_event) ⇒ Boolean (private)
179 180 181 182 183 184 |
# File 'lib/whodunit/chronicles/service.rb', line 179 def should_chronicle_table?(change_event) Chronicles.config.chronicle_table?( change_event.table_name, change_event.schema_name, ) end |
#should_retry? ⇒ Boolean (private)
198 199 200 |
# File 'lib/whodunit/chronicles/service.rb', line 198 def should_retry? running? && @retry_count < Chronicles.config.max_retry_attempts end |
#start ⇒ self
Start the chronicle streaming service
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/whodunit/chronicles/service.rb', line 35 def start return self if running? log(:info, 'Starting Chronicles streaming service') validate_setup! test_connections! @running = true @retry_count = 0 start_streaming_with_retry log(:info, 'Chronicles streaming service started successfully') self rescue StandardError => e log(:error, 'Failed to start service', error: e.) @running = false raise end |
#start_streaming_with_retry ⇒ Object (private)
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/whodunit/chronicles/service.rb', line 145 def start_streaming_with_retry @executor.post do loop do break unless running? begin adapter.start_streaming do |change_event| process_change_event(change_event) end rescue StandardError => e handle_streaming_error(e) break unless should_retry? end end end end |
#status ⇒ Hash
Get service status information
83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/whodunit/chronicles/service.rb', line 83 def status { running: running?, adapter_streaming: adapter.streaming?, adapter_position: adapter.current_position, retry_count: @retry_count, executor_status: { active_count: @executor.active_count, completed_task_count: @executor.completed_task_count, queue_length: @executor.queue_length, }, } end |
#stop ⇒ void
This method returns an undefined value.
Stop the chronicle streaming service
59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/whodunit/chronicles/service.rb', line 59 def stop return unless running? log(:info, 'Stopping Chronicles streaming service') @running = false adapter.stop_streaming if adapter.streaming? @executor.shutdown @executor.wait_for_termination(timeout: 30) processor.close log(:info, 'Chronicles streaming service stopped') end |
#teardown! ⇒ void
This method returns an undefined value.
Tear down the chronicle streaming infrastructure
109 110 111 112 113 114 |
# File 'lib/whodunit/chronicles/service.rb', line 109 def teardown! log(:info, 'Tearing down chronicle streaming infrastructure') stop if running? adapter.teardown log(:info, 'Chronicle streaming infrastructure teardown completed') end |
#test_connections! ⇒ Object (private)
137 138 139 140 141 142 143 |
# File 'lib/whodunit/chronicles/service.rb', line 137 def test_connections! adapter.test_connection # Test processor connection by creating a dummy connection processor.send(:ensure_connection) rescue StandardError => e raise AdapterError, "Connection test failed: #{e.}" end |
#validate_setup! ⇒ Object (private)
129 130 131 132 133 134 135 |
# File 'lib/whodunit/chronicles/service.rb', line 129 def validate_setup! Chronicles.config.validate! return if adapter.test_connection raise AdapterError, 'Failed to connect to source database' end |