Class: MysqlBinlog::BinlogEventParser

Inherits:
Object
  • Object
show all
Defined in:
lib/mysql_binlog/binlog_event_parser.rb

Overview

Parse binary log events from a provided binary log. Must be driven externally, but handles all the details of parsing an event header and the content of the various event types.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(binlog_instance) ⇒ BinlogEventParser

Returns a new instance of BinlogEventParser.



191
192
193
194
195
196
# File 'lib/mysql_binlog/binlog_event_parser.rb', line 191

def initialize(binlog_instance)
  @binlog = binlog_instance
  @reader = binlog_instance.reader
  @parser = binlog_instance.field_parser
  @table_map = {}
end

Instance Attribute Details

#binlogObject

The binary log object this event parser will parse events from.



182
183
184
# File 'lib/mysql_binlog/binlog_event_parser.rb', line 182

def binlog
  @binlog
end

#parserObject

The binary log field parser extracted from the binlog object for convenience.



189
190
191
# File 'lib/mysql_binlog/binlog_event_parser.rb', line 189

def parser
  @parser
end

#readerObject

The binary log reader extracted from the binlog object for convenience.



185
186
187
# File 'lib/mysql_binlog/binlog_event_parser.rb', line 185

def reader
  @reader
end

Instance Method Details

#diff_row_images(before, after) ⇒ Object



517
518
519
520
521
522
523
524
525
526
527
528
# File 'lib/mysql_binlog/binlog_event_parser.rb', line 517

def diff_row_images(before, after)
  diff = {}
  before.each_with_index do |before_column, index|
    after_column = after[index]
    before_value = before_column.first[1]
    after_value = after_column.first[1]
    if before_value != after_value
      diff[index] = { before: before_value, after: after_value }
    end
  end
  diff
end

#event_headerObject

Parse an event header, which is consistent for all event types.

Documented in sql/log_event.h line ~749 as “Common-Header”

Implemented in sql/log_event.cc line ~936 in Log_event::write_header



203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/mysql_binlog/binlog_event_parser.rb', line 203

def event_header
  header = {}
  header[:timestamp]      = parser.read_uint32
  event_type = parser.read_uint8
  header[:event_type]     = EVENT_TYPES[event_type] || "unknown_#{event_type}".to_sym
  header[:server_id]      = parser.read_uint32
  header[:event_length]   = parser.read_uint32
  header[:next_position]  = parser.read_uint32
  header[:flags] = parser.read_uint_bitmap_by_size_and_name(2, EVENT_HEADER_FLAGS)

  header
end

#format_description_event(header) ⇒ Object

Parse fields for a Format_description event.

Implemented in sql/log_event.cc line ~4123 in Format_description_log_event::write



219
220
221
222
223
224
225
226
# File 'lib/mysql_binlog/binlog_event_parser.rb', line 219

def format_description_event(header)
  fields = {}
  fields[:binlog_version]   = parser.read_uint16
  fields[:server_version]   = parser.read_nstringz(50)
  fields[:create_timestamp] = parser.read_uint32
  fields[:header_length]    = parser.read_uint8
  fields
end

#format_gtid(sid, gno_or_ivs) ⇒ Object

6d9190a2-cca6-11e8-aa8c-42010aef0019:551845019



608
609
610
# File 'lib/mysql_binlog/binlog_event_parser.rb', line 608

def format_gtid(sid, gno_or_ivs)
  "#{format_gtid_sid(sid)}:#{gno_or_ivs}"
end

#format_gtid_sid(sid) ⇒ Object



603
604
605
# File 'lib/mysql_binlog/binlog_event_parser.rb', line 603

def format_gtid_sid(sid)
  [0..3, 4..5, 6..7, 8..9, 10..15].map { |r| in_hex(sid[r]) }.join("-")
end

#generic_rows_event_v1(header) ⇒ Object Also known as: write_rows_event_v1, update_rows_event_v1, delete_rows_event_v1

Parse fields for any of the row-based replication row events:

  • Write_rows which is used for INSERT.

  • Update_rows which is used for UPDATE.

  • Delete_rows which is used for DELETE.

Implemented in sql/log_event.cc line ~8039 in Rows_log_event::write_data_header and Rows_log_event::write_data_body



570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
# File 'lib/mysql_binlog/binlog_event_parser.rb', line 570

def generic_rows_event_v1(header)
  fields = {}
  table_id = parser.read_uint48
  fields[:table] = @table_map[table_id]
  fields[:flags] = parser.read_uint_bitmap_by_size_and_name(2, GENERIC_ROWS_EVENT_FLAGS)
  columns = parser.read_varint
  columns_used = {}
  case header[:event_type]
  when :write_rows_event_v1
    columns_used[:after]  = parser.read_bit_array(columns)
  when :delete_rows_event_v1
    columns_used[:before] = parser.read_bit_array(columns)
  when :update_rows_event_v1
    columns_used[:before] = parser.read_bit_array(columns)
    columns_used[:after]  = parser.read_bit_array(columns)
  end
  fields[:row_image] = _generic_rows_event_row_images_v1(header, fields, columns_used)
  fields
end

#gtid_log_event(header) ⇒ Object



633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
# File 'lib/mysql_binlog/binlog_event_parser.rb', line 633

def gtid_log_event(header)
  flags = parser.read_uint8
  sid = parser.read_nstring(16)
  gno = parser.read_uint64
  lts_type = parser.read_uint8
  lts_last_committed = parser.read_uint64
  lts_sequence_number = parser.read_uint64

  {
    flags: flags,
    gtid: format_gtid(sid, gno),
    lts: {
      type: lts_type,
      last_committed: lts_last_committed,
      sequence_number: lts_sequence_number,
    },
  }
end

#in_hex(bytes) ⇒ Object



599
600
601
# File 'lib/mysql_binlog/binlog_event_parser.rb', line 599

def in_hex(bytes)
  bytes.each_byte.map { |c| "%02x" % c.ord }.join
end

#intvar_event(header) ⇒ Object

Parse fields for an Intvar event.

Implemented in sql/log_event.cc line ~5326 in Intvar_log_event::write



338
339
340
341
342
343
344
345
346
# File 'lib/mysql_binlog/binlog_event_parser.rb', line 338

def intvar_event(header)
  fields = {}

  fields[:intvar_type]  = parser.read_uint8
  fields[:intvar_name]  = INTVAR_EVENT_INTVAR_TYPES[fields[:intvar_type]]
  fields[:intvar_value] = parser.read_uint64

  fields
end

#previous_gtids_log_event(header) ⇒ Object



612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
# File 'lib/mysql_binlog/binlog_event_parser.rb', line 612

def previous_gtids_log_event(header)
  n_sids = parser.read_uint64

  gtids = []
  n_sids.times do
    sid = parser.read_nstring(16)
    n_ivs = parser.read_uint64
    ivs = []
    n_ivs.times do
      iv_start = parser.read_uint64
      iv_end = parser.read_uint64
      ivs << "#{iv_start}-#{iv_end}"
    end
    gtids << format_gtid(sid, ivs.join(":"))
  end

  {
    previous_gtids: gtids
  }
end

#query_event(header) ⇒ Object

Parse fields for a Query event.

Implemented in sql/log_event.cc line ~2214 in Query_log_event::write



322
323
324
325
326
327
328
329
330
331
332
333
# File 'lib/mysql_binlog/binlog_event_parser.rb', line 322

def query_event(header)
  fields = {}
  fields[:thread_id] = parser.read_uint32
  fields[:elapsed_time] = parser.read_uint32
  db_length = parser.read_uint8
  fields[:error_code] = parser.read_uint16
  fields[:status] = _query_event_status(header, fields)
  fields[:db] = parser.read_nstringz(db_length + 1)
  query_length = reader.remaining(header)
  fields[:query] = reader.read([query_length, binlog.max_query_length].min)
  fields
end

#rand_event(header) ⇒ Object

Parse fields for an Rand event.

Implemented in sql/log_event.cc line ~5454 in Rand_log_event::write



360
361
362
363
364
365
# File 'lib/mysql_binlog/binlog_event_parser.rb', line 360

def rand_event(header)
  fields = {}
  fields[:seed1] = parser.read_uint64
  fields[:seed2] = parser.read_uint64
  fields
end

#rotate_event(header) ⇒ Object

Parse fields for a Rotate event.

Implemented in sql/log_event.cc line ~5157 in Rotate_log_event::write



231
232
233
234
235
236
237
# File 'lib/mysql_binlog/binlog_event_parser.rb', line 231

def rotate_event(header)
  fields = {}
  fields[:pos] = parser.read_uint64
  name_length = reader.remaining(header)
  fields[:name] = parser.read_nstring(name_length)
  fields
end

#rows_query_log_event(header) ⇒ Object



594
595
596
597
# File 'lib/mysql_binlog/binlog_event_parser.rb', line 594

def rows_query_log_event(header)
  reader.read(1) # skip useless byte length which is unused
  { query: reader.read(header[:payload_length]-1) }
end

#table_map_event(header) ⇒ Object

Parse fields for a Table_map event.

Implemented in sql/log_event.cc line ~8638 in Table_map_log_event::write_data_header and Table_map_log_event::write_data_body



425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
# File 'lib/mysql_binlog/binlog_event_parser.rb', line 425

def table_map_event(header)
  fields = {}
  fields[:table_id] = parser.read_uint48
  fields[:flags] = parser.read_uint_bitmap_by_size_and_name(2, TABLE_MAP_EVENT_FLAGS)
  map_entry = @table_map[fields[:table_id]] = {}
  map_entry[:db] = parser.read_lpstringz
  map_entry[:table] = parser.read_lpstringz
  columns = parser.read_varint
  columns_type = parser.read_uint8_array(columns).map { |c| MYSQL_TYPES[c] || "unknown_#{c}".to_sym }
   = (columns_type)
  columns_nullable = parser.read_bit_array(columns)

  # Remap overloaded types before we piece together the entire event.
  columns.times do |c|
    if [c] and [c][:type]
      columns_type[c] = [c][:type]
      [c].delete :type
    end
  end

  map_entry[:columns] = columns.times.map do |c|
    {
      :type     => columns_type[c],
      :nullable => columns_nullable[c],
      :metadata => [c],
    }
  end

  fields[:map_entry] = map_entry
  fields
end

#table_metadata_event(header) ⇒ Object

Parse fields for a Table_metadata event, which is specific to Twitter MySQL releases at the moment.

Implemented in sql/log_event.cc line ~8772 (in Twitter MySQL) in Table_metadata_log_event::write_data_header and Table_metadata_log_event::write_data_body



463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
# File 'lib/mysql_binlog/binlog_event_parser.rb', line 463

def (header)
  fields = {}
  table_id = parser.read_uint48
  columns = parser.read_uint16

  fields[:table] = @table_map[table_id]
  fields[:flags] = parser.read_uint16
  fields[:columns] = columns.times.map do |c|
    descriptor_length = parser.read_uint32
    column_type = parser.read_uint8
    @table_map[table_id][:columns][c][:description] = {
      :type => MYSQL_TYPES[column_type] || "unknown_#{column_type}".to_sym,
      :length => parser.read_uint32,
      :scale => parser.read_uint8,
      :character_set => COLLATION[parser.read_uint16],
      :flags => parser.read_uint_bitmap_by_size_and_name(2,
                ),
      :name => parser.read_varstring,
      :type_name => parser.read_varstring,
      :comment => parser.read_varstring,
    }
  end
  fields
end

#xid_event(header) ⇒ Object

Parse fields for an Xid event.

Implemented in sql/log_event.cc line ~5559 in Xid_log_event::write



351
352
353
354
355
# File 'lib/mysql_binlog/binlog_event_parser.rb', line 351

def xid_event(header)
  fields = {}
  fields[:xid] = parser.read_uint64
  fields
end