Class: Whodunit::Chronicles::Adapters::MySQL
- Inherits:
-
StreamAdapter
- Object
- StreamAdapter
- Whodunit::Chronicles::Adapters::MySQL
- Defined in:
- lib/whodunit/chronicles/adapters/mysql.rb
Overview
MySQL/MariaDB binary log streaming adapter
Uses MySQL's binary log replication to stream database changes without impacting application performance.
Constant Summary collapse
- DEFAULT_SERVER_ID =
1001
Instance Attribute Summary collapse
-
#binlog_file ⇒ Object
readonly
Returns the value of attribute binlog_file.
-
#binlog_position ⇒ Object
readonly
Returns the value of attribute binlog_position.
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
-
#database_url ⇒ Object
readonly
Returns the value of attribute database_url.
-
#server_id ⇒ Object
readonly
Returns the value of attribute server_id.
Attributes inherited from StreamAdapter
Instance Method Summary collapse
- #close_connection ⇒ Object private
-
#current_position ⇒ Object
Get current replication position.
- #enable_binlog_checksum ⇒ Object private
- #ensure_setup ⇒ Object private
- #establish_connection ⇒ Object private
- #fetch_current_position ⇒ Object private
-
#initialize(database_url: Chronicles.config.database_url, server_id: DEFAULT_SERVER_ID, logger: Chronicles.logger) ⇒ MySQL
constructor
A new instance of MySQL.
- #parse_database_url(url) ⇒ Object private
- #process_binlog_stream {|change_event| ... } ⇒ Object private
- #register_replica_server ⇒ Object private
- #request_binlog_dump ⇒ Object private
-
#setup ⇒ Object
Set up binary log replication.
-
#start_streaming ⇒ Object
Start streaming binary log changes.
-
#stop_streaming ⇒ Object
Stop streaming.
- #stream_binlog_events ⇒ Object private
-
#teardown ⇒ Object
Remove binary log replication setup (minimal cleanup needed).
-
#test_connection ⇒ Object
Test database connection.
- #validate_binlog_format ⇒ Object private
- #validate_server_id ⇒ Object private
Methods inherited from StreamAdapter
Constructor Details
#initialize(database_url: Chronicles.config.database_url, server_id: DEFAULT_SERVER_ID, logger: Chronicles.logger) ⇒ MySQL
Returns a new instance of MySQL.
18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 18 def initialize( database_url: Chronicles.config.database_url, server_id: DEFAULT_SERVER_ID, logger: Chronicles.logger ) super(logger: logger) @database_url = database_url @server_id = server_id @connection = nil @binlog_file = nil @binlog_position = nil @binlog_checksum = true end |
Instance Attribute Details
#binlog_file ⇒ Object (readonly)
Returns the value of attribute binlog_file.
16 17 18 |
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 16 def binlog_file @binlog_file end |
#binlog_position ⇒ Object (readonly)
Returns the value of attribute binlog_position.
16 17 18 |
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 16 def binlog_position @binlog_position end |
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
16 17 18 |
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 16 def connection @connection end |
#database_url ⇒ Object (readonly)
Returns the value of attribute database_url.
16 17 18 |
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 16 def database_url @database_url end |
#server_id ⇒ Object (readonly)
Returns the value of attribute server_id.
16 17 18 |
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 16 def server_id @server_id end |
Instance Method Details
#close_connection ⇒ Object (private)
132 133 134 135 |
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 132 def close_connection @connection&.close @connection = nil end |
#current_position ⇒ Object
Get current replication position
65 66 67 68 69 70 |
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 65 def current_position return "#{@binlog_file}:#{@binlog_position}" if @binlog_file && @binlog_position fetch_current_position "#{@binlog_file}:#{@binlog_position}" end |
#enable_binlog_checksum ⇒ Object (private)
180 181 182 183 |
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 180 def enable_binlog_checksum @connection.query('SET @master_binlog_checksum = @@global.binlog_checksum') log(:debug, 'Binary log checksum enabled') end |
#ensure_setup ⇒ Object (private)
149 150 151 152 |
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 149 def ensure_setup validate_binlog_format validate_server_id end |
#establish_connection ⇒ Object (private)
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 110 def establish_connection return if @connection&.ping parsed_url = parse_database_url(@database_url) @connection = Trilogy.new( host: parsed_url[:host], port: parsed_url[:port] || 3306, username: parsed_url[:username], password: parsed_url[:password], database: parsed_url[:database], ssl: parsed_url[:ssl], ) log(:debug, 'Established MySQL connection', host: parsed_url[:host], database: parsed_url[:database]) rescue StandardError => e log(:error, 'Failed to establish connection', error: e.) raise AdapterError, "Connection failed: #{e.message}" end |
#fetch_current_position ⇒ Object (private)
185 186 187 188 189 190 191 192 193 194 195 196 |
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 185 def fetch_current_position result = @connection.query('SHOW MASTER STATUS') status = result.first raise ReplicationError, 'Unable to fetch master status - binary logging may be disabled' unless status @binlog_file = status['File'] @binlog_position = status['Position'] log(:debug, 'Fetched master position', file: @binlog_file, position: @binlog_position) end |
#parse_database_url(url) ⇒ Object (private)
137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 137 def parse_database_url(url) uri = URI.parse(url) { host: uri.host, port: uri.port, username: uri.user, password: uri.password, database: uri.path&.sub('/', ''), ssl: uri.query&.include?('ssl=true'), } end |
#process_binlog_stream {|change_event| ... } ⇒ Object (private)
230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 |
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 230 def process_binlog_stream(&) # This would process the binary log event stream # Each event would be parsed and converted to a ChangeEvent log(:info, 'Processing binary log stream (placeholder implementation)') # Placeholder: In a real implementation, this would: # 1. Read binary log events from the stream # 2. Parse event headers and data # 3. Convert to ChangeEvent objects # 4. Yield each event to the block # For now, we'll simulate with a warning log(:warn, 'MySQL binary log streaming requires full protocol implementation') # Yield a placeholder change event to demonstrate the interface change_event = ChangeEvent.new( table_name: 'example_table', action: 'INSERT', primary_key: { id: 1 }, new_data: { id: 1, name: 'test' }, old_data: nil, timestamp: Time.now, metadata: { position: current_position }, ) yield(change_event) if block_given? end |
#register_replica_server ⇒ Object (private)
212 213 214 215 216 217 218 219 |
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 212 def register_replica_server # This would typically use COM_REGISTER_SLAVE MySQL protocol command # For now, we'll use a simplified approach log(:debug, 'Registering as replica server', server_id: @server_id) # NOTE: Full implementation would require low-level MySQL protocol handling # This is a placeholder for the binary log streaming setup end |
#request_binlog_dump ⇒ Object (private)
221 222 223 224 225 226 227 228 |
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 221 def request_binlog_dump log(:debug, 'Requesting binary log dump', file: @binlog_file, position: @binlog_position) # This would use COM_BINLOG_DUMP MySQL protocol command # Full implementation requires binary protocol handling end |
#setup ⇒ Object
Set up binary log replication
73 74 75 76 77 78 79 80 81 82 |
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 73 def setup log(:info, 'Setting up MySQL binary log replication') establish_connection validate_binlog_format validate_server_id enable_binlog_checksum log(:info, 'MySQL setup completed successfully') end |
#start_streaming ⇒ Object
Start streaming binary log changes
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 33 def start_streaming(&) raise ArgumentError, 'Block required for processing events' unless block_given? log(:info, 'Starting MySQL binary log streaming') establish_connection ensure_setup self.running = true fetch_current_position log(:info, 'Starting replication from position', file: @binlog_file, position: @binlog_position) begin stream_binlog_events(&) rescue StandardError => e log(:error, 'Streaming error', error: e., backtrace: e.backtrace.first(5)) raise ReplicationError, "Failed to stream changes: #{e.message}" ensure self.running = false end end |
#stop_streaming ⇒ Object
Stop streaming
58 59 60 61 62 |
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 58 def stop_streaming log(:info, 'Stopping MySQL binary log streaming') self.running = false close_connection end |
#stream_binlog_events ⇒ Object (private)
198 199 200 201 202 203 204 205 206 207 208 209 210 |
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 198 def stream_binlog_events(&) # Register as replica server register_replica_server # Request binary log dump request_binlog_dump # Process binary log events process_binlog_stream(&) rescue StandardError => e log(:error, 'Binary log streaming error', error: e.) raise end |
#teardown ⇒ Object
Remove binary log replication setup (minimal cleanup needed)
85 86 87 88 89 |
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 85 def teardown log(:info, 'Tearing down MySQL binary log replication') close_connection log(:info, 'MySQL teardown completed') end |
#test_connection ⇒ Object
Test database connection
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 92 def test_connection establish_connection result = @connection.query('SELECT @@hostname, @@version, @@server_id') info = result.first log(:info, 'Connection test successful', hostname: info['@@hostname'], version: info['@@version'], server_id: info['@@server_id']) true rescue StandardError => e log(:error, 'Connection test failed', error: e.) false end |
#validate_binlog_format ⇒ Object (private)
154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 154 def validate_binlog_format result = @connection.query('SELECT @@binlog_format') format = result.first['@@binlog_format'] unless %w[ROW MIXED].include?(format) raise ReplicationError, "Binary log format must be ROW or MIXED, currently: #{format}" end log(:debug, 'Binary log format validated', format: format) end |
#validate_server_id ⇒ Object (private)
166 167 168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 166 def validate_server_id result = @connection.query('SELECT @@server_id') current_server_id = result.first['@@server_id'].to_i if current_server_id == @server_id raise ReplicationError, "Server ID conflict: #{@server_id} is already in use" end log(:debug, 'Server ID validated', current: current_server_id, replication: @server_id) end |