Class: Whodunit::Chronicles::Processor

Inherits:
Object
  • Object
show all
Includes:
Connection, Persistence, Table
Defined in:
lib/whodunit/chronicles/processor.rb

Overview

Processes database change events and creates chronicle records

Transforms ChangeEvent objects into structured chronicle records with complete object serialization and metadata.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Persistence

#build_record_params, #persist_record, #persist_record_mysql, #persist_record_postgresql, #persist_records_batch, #persist_records_batch_mysql, #persist_records_batch_postgresql

Methods included from Table

#create_mysql_table, #create_postgresql_table, #ensure_table_exists

Methods included from Connection

#connection_active?, #create_connection, #detect_database_type, #ensure_connection, #parse_mysql_url, #setup_connection_specifics

Constructor Details

#initialize(audit_database_url: Chronicles.config.audit_database_url, logger: Chronicles.logger) ⇒ Processor

Returns a new instance of Processor.



16
17
18
19
20
21
22
23
# File 'lib/whodunit/chronicles/processor.rb', line 16

def initialize(
  audit_database_url: Chronicles.config.audit_database_url,
  logger: Chronicles.logger
)
  @audit_database_url = audit_database_url
  @logger = logger
  @connection = nil
end

Instance Attribute Details

#connectionObject (readonly)

Returns the value of attribute connection.



14
15
16
# File 'lib/whodunit/chronicles/processor.rb', line 14

def connection
  @connection
end

#loggerObject (readonly)

Returns the value of attribute logger.



14
15
16
# File 'lib/whodunit/chronicles/processor.rb', line 14

def logger
  @logger
end

Instance Method Details

#build_metadata(change_event) ⇒ Object (private)



112
113
114
115
116
117
118
119
120
# File 'lib/whodunit/chronicles/processor.rb', line 112

def (change_event)
  {
    table_schema: change_event.schema_name,
    qualified_table_name: change_event.qualified_table_name,
    changed_columns: change_event.changed_columns,
    adapter_metadata: change_event.,
    chronicles_version: Chronicles::VERSION,
  }
end

#build_record(change_event) ⇒ Object (private)



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/whodunit/chronicles/processor.rb', line 78

def build_record(change_event)
   = (change_event)

  {
    id: nil, # Will be set by database
    table_name: change_event.table_name,
    schema_name: change_event.schema_name,
    record_id: change_event.primary_key,
    action: change_event.action,
    old_data: change_event.old_data,
    new_data: change_event.new_data,
    changes: change_event.changes,
    user_id: [:user_id],
    user_type: [:user_type],
    transaction_id: change_event.transaction_id,
    sequence_number: change_event.sequence_number,
    occurred_at: change_event.timestamp,
    created_at: Time.now,
    metadata: (change_event),
  }
end

#closeObject

Close database connection



71
72
73
74
# File 'lib/whodunit/chronicles/processor.rb', line 71

def close
  @connection&.close
  @connection = nil
end

#extract_user_info(change_event) ⇒ Object (private)



100
101
102
103
104
105
106
107
108
109
110
# File 'lib/whodunit/chronicles/processor.rb', line 100

def (change_event)
  data = change_event.current_data || {}

  # Look for Whodunit user attribution fields
  user_id = data['creator_id'] || data['updater_id'] || data['deleter_id']

  {
    user_id: user_id,
    user_type: user_id ? 'User' : nil,
  }
end

#log(level, message, context = {}) ⇒ Object (private)



122
123
124
# File 'lib/whodunit/chronicles/processor.rb', line 122

def log(level, message, context = {})
  logger.public_send(level, message, processor: 'Processor', **context)
end

#process(change_event) ⇒ Hash

Process a change event and create chronicle record

Parameters:

  • change_event (ChangeEvent)

    The database change to chronicle

Returns:

  • (Hash)

    The created chronicle record



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/whodunit/chronicles/processor.rb', line 29

def process(change_event)
  ensure_connection

  record = build_record(change_event)
  persist_record(record)

  log(:debug, 'Processed change event',
    table: change_event.qualified_table_name,
    action: change_event.action,
    id: record[:id])

  record
rescue StandardError => e
  log(:error, 'Failed to process change event',
    error: e.message,
    event: change_event.to_s)
  raise
end

#process_batch(change_events) ⇒ Array<Hash>

Process multiple change events in a batch

Parameters:

  • change_events (Array<ChangeEvent>)

    Array of change events

Returns:

  • (Array<Hash>)

    Array of created chronicle records



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/whodunit/chronicles/processor.rb', line 52

def process_batch(change_events)
  return [] if change_events.empty?

  ensure_connection

  records = change_events.map { |event| build_record(event) }
  persist_records_batch(records)

  log(:info, 'Processed batch of change events', count: change_events.size)

  records
rescue StandardError => e
  log(:error, 'Failed to process batch',
    error: e.message,
    count: change_events.size)
  raise
end