Class: Whodunit::Chronicles::Adapters::MySQL

Inherits:
StreamAdapter show all
Defined in:
lib/whodunit/chronicles/adapters/mysql.rb

Overview

MySQL/MariaDB binary log streaming adapter

Uses MySQL's binary log replication to stream database changes without impacting application performance.

Constant Summary collapse

DEFAULT_SERVER_ID =
1001

Instance Attribute Summary collapse

Attributes inherited from StreamAdapter

#logger, #position, #running

Instance Method Summary collapse

Methods inherited from StreamAdapter

#log, #streaming?

Constructor Details

#initialize(database_url: Chronicles.config.database_url, server_id: DEFAULT_SERVER_ID, logger: Chronicles.logger) ⇒ MySQL

Returns a new instance of MySQL.



18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 18

def initialize(
  database_url: Chronicles.config.database_url,
  server_id: DEFAULT_SERVER_ID,
  logger: Chronicles.logger
)
  super(logger: logger)
  @database_url = database_url
  @server_id = server_id
  @connection = nil
  @binlog_file = nil
  @binlog_position = nil
  @binlog_checksum = true
end

Instance Attribute Details

#binlog_fileObject (readonly)

Returns the value of attribute binlog_file.



16
17
18
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 16

def binlog_file
  @binlog_file
end

#binlog_positionObject (readonly)

Returns the value of attribute binlog_position.



16
17
18
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 16

def binlog_position
  @binlog_position
end

#connectionObject (readonly)

Returns the value of attribute connection.



16
17
18
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 16

def connection
  @connection
end

#database_urlObject (readonly)

Returns the value of attribute database_url.



16
17
18
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 16

def database_url
  @database_url
end

#server_idObject (readonly)

Returns the value of attribute server_id.



16
17
18
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 16

def server_id
  @server_id
end

Instance Method Details

#close_connectionObject (private)



132
133
134
135
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 132

def close_connection
  @connection&.close
  @connection = nil
end

#current_positionObject

Get current replication position



65
66
67
68
69
70
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 65

def current_position
  return "#{@binlog_file}:#{@binlog_position}" if @binlog_file && @binlog_position

  fetch_current_position
  "#{@binlog_file}:#{@binlog_position}"
end

#enable_binlog_checksumObject (private)



180
181
182
183
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 180

def enable_binlog_checksum
  @connection.query('SET @master_binlog_checksum = @@global.binlog_checksum')
  log(:debug, 'Binary log checksum enabled')
end

#ensure_setupObject (private)



149
150
151
152
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 149

def ensure_setup
  validate_binlog_format
  validate_server_id
end

#establish_connectionObject (private)



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 110

def establish_connection
  return if @connection&.ping

  parsed_url = parse_database_url(@database_url)

  @connection = Trilogy.new(
    host: parsed_url[:host],
    port: parsed_url[:port] || 3306,
    username: parsed_url[:username],
    password: parsed_url[:password],
    database: parsed_url[:database],
    ssl: parsed_url[:ssl],
  )

  log(:debug, 'Established MySQL connection',
    host: parsed_url[:host],
    database: parsed_url[:database])
rescue StandardError => e
  log(:error, 'Failed to establish connection', error: e.message)
  raise AdapterError, "Connection failed: #{e.message}"
end

#fetch_current_positionObject (private)

Raises:



185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 185

def fetch_current_position
  result = @connection.query('SHOW MASTER STATUS')
  status = result.first

  raise ReplicationError, 'Unable to fetch master status - binary logging may be disabled' unless status

  @binlog_file = status['File']
  @binlog_position = status['Position']
  log(:debug, 'Fetched master position',
    file: @binlog_file,
    position: @binlog_position)
end

#parse_database_url(url) ⇒ Object (private)



137
138
139
140
141
142
143
144
145
146
147
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 137

def parse_database_url(url)
  uri = URI.parse(url)
  {
    host: uri.host,
    port: uri.port,
    username: uri.user,
    password: uri.password,
    database: uri.path&.sub('/', ''),
    ssl: uri.query&.include?('ssl=true'),
  }
end

#process_binlog_stream {|change_event| ... } ⇒ Object (private)

Yields:

  • (change_event)


230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 230

def process_binlog_stream(&)
  # This would process the binary log event stream
  # Each event would be parsed and converted to a ChangeEvent

  log(:info, 'Processing binary log stream (placeholder implementation)')

  # Placeholder: In a real implementation, this would:
  # 1. Read binary log events from the stream
  # 2. Parse event headers and data
  # 3. Convert to ChangeEvent objects
  # 4. Yield each event to the block

  # For now, we'll simulate with a warning
  log(:warn, 'MySQL binary log streaming requires full protocol implementation')

  # Yield a placeholder change event to demonstrate the interface
  change_event = ChangeEvent.new(
    table_name: 'example_table',
    action: 'INSERT',
    primary_key: { id: 1 },
    new_data: { id: 1, name: 'test' },
    old_data: nil,
    timestamp: Time.now,
    metadata: { position: current_position },
  )

  yield(change_event) if block_given?
end

#register_replica_serverObject (private)



212
213
214
215
216
217
218
219
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 212

def register_replica_server
  # This would typically use COM_REGISTER_SLAVE MySQL protocol command
  # For now, we'll use a simplified approach
  log(:debug, 'Registering as replica server', server_id: @server_id)

  # NOTE: Full implementation would require low-level MySQL protocol handling
  # This is a placeholder for the binary log streaming setup
end

#request_binlog_dumpObject (private)



221
222
223
224
225
226
227
228
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 221

def request_binlog_dump
  log(:debug, 'Requesting binary log dump',
    file: @binlog_file,
    position: @binlog_position)

  # This would use COM_BINLOG_DUMP MySQL protocol command
  # Full implementation requires binary protocol handling
end

#setupObject

Set up binary log replication



73
74
75
76
77
78
79
80
81
82
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 73

def setup
  log(:info, 'Setting up MySQL binary log replication')

  establish_connection
  validate_binlog_format
  validate_server_id
  enable_binlog_checksum

  log(:info, 'MySQL setup completed successfully')
end

#start_streamingObject

Start streaming binary log changes

Raises:

  • (ArgumentError)


33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 33

def start_streaming(&)
  raise ArgumentError, 'Block required for processing events' unless block_given?

  log(:info, 'Starting MySQL binary log streaming')

  establish_connection
  ensure_setup

  self.running = true
  fetch_current_position

  log(:info, 'Starting replication from position',
    file: @binlog_file, position: @binlog_position)

  begin
    stream_binlog_events(&)
  rescue StandardError => e
    log(:error, 'Streaming error', error: e.message, backtrace: e.backtrace.first(5))
    raise ReplicationError, "Failed to stream changes: #{e.message}"
  ensure
    self.running = false
  end
end

#stop_streamingObject

Stop streaming



58
59
60
61
62
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 58

def stop_streaming
  log(:info, 'Stopping MySQL binary log streaming')
  self.running = false
  close_connection
end

#stream_binlog_eventsObject (private)



198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 198

def stream_binlog_events(&)
  # Register as replica server
  register_replica_server

  # Request binary log dump
  request_binlog_dump

  # Process binary log events
  process_binlog_stream(&)
rescue StandardError => e
  log(:error, 'Binary log streaming error', error: e.message)
  raise
end

#teardownObject

Remove binary log replication setup (minimal cleanup needed)



85
86
87
88
89
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 85

def teardown
  log(:info, 'Tearing down MySQL binary log replication')
  close_connection
  log(:info, 'MySQL teardown completed')
end

#test_connectionObject

Test database connection



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 92

def test_connection
  establish_connection
  result = @connection.query('SELECT @@hostname, @@version, @@server_id')
  info = result.first

  log(:info, 'Connection test successful',
    hostname: info['@@hostname'],
    version: info['@@version'],
    server_id: info['@@server_id'])

  true
rescue StandardError => e
  log(:error, 'Connection test failed', error: e.message)
  false
end

#validate_binlog_formatObject (private)



154
155
156
157
158
159
160
161
162
163
164
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 154

def validate_binlog_format
  result = @connection.query('SELECT @@binlog_format')
  format = result.first['@@binlog_format']

  unless %w[ROW MIXED].include?(format)
    raise ReplicationError,
      "Binary log format must be ROW or MIXED, currently: #{format}"
  end

  log(:debug, 'Binary log format validated', format: format)
end

#validate_server_idObject (private)



166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/whodunit/chronicles/adapters/mysql.rb', line 166

def validate_server_id
  result = @connection.query('SELECT @@server_id')
  current_server_id = result.first['@@server_id'].to_i

  if current_server_id == @server_id
    raise ReplicationError,
      "Server ID conflict: #{@server_id} is already in use"
  end

  log(:debug, 'Server ID validated',
    current: current_server_id,
    replication: @server_id)
end