Class: Flydata::Parser::Mysql::MysqlDumpParser

Inherits:
Object
  • Object
show all
Defined in:
lib/flydata/parser/mysql/dump_parser.rb

Defined Under Namespace

Modules: State Classes: InsertParser

Constant Summary collapse

BINLOG_INV_ERROR_CHUNK_SIZE =
250

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(option = {}) ⇒ MysqlDumpParser

Returns a new instance of MysqlDumpParser.


284
285
286
287
# File 'lib/flydata/parser/mysql/dump_parser.rb', line 284

def initialize(option = {})
  @binlog_pos = option[:binlog_pos] or raise ArgumentError.new("binlog position is required")
  @option = option
end

Instance Attribute Details

#binlog_posObject

Returns the value of attribute binlog_pos.


281
282
283
# File 'lib/flydata/parser/mysql/dump_parser.rb', line 281

def binlog_pos
  @binlog_pos
end

Instance Method Details

#parse(dump_io, create_table_block, insert_record_block, check_point_block) ⇒ Object


289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
# File 'lib/flydata/parser/mysql/dump_parser.rb', line 289

def parse(dump_io, create_table_block, insert_record_block, check_point_block)
  unless dump_io.kind_of?(IO)
    raise ArgumentError.new("Invalid argument. The first parameter must be io.")
  end

  invalid_file = false
  current_state = State::START
  substate = nil
  buffered_line = nil
  bytesize = 0

  readline_proc = Proc.new do
    line = nil
    if buffered_line
      line = buffered_line
      buffered_line = nil
    else
      rawline = dump_io.readline.encode('utf-16', :undef => :replace, :invalid => :replace).encode('utf-8')
      bytesize += rawline.bytesize
      line = rawline.strip
    end
    line
  end

  state_start = Proc.new do
    line = readline_proc.call

    # -- Server version 5.6.21-log
    if line.start_with?('-- Server version')
      current_state = State::CREATE_TABLE
      check_point_block.call(nil, dump_io.pos, bytesize, @binlog_pos, current_state)
    end
  end

  current_table = nil
  state_create_table = Proc.new do
    line = readline_proc.call

    # CREATE TABLE `active_admin_comments` (
    m = /^CREATE TABLE `(?<table_name>[^`]+)`/.match(line)
    if m
      current_table = MysqlTable.new(m[:table_name])
      current_state = State::CREATE_TABLE_COLUMNS
    end
  end

  state_create_table_constraints = Proc.new do
    line = readline_proc.call

    #  PRIMARY KEY (`id`),
    if line.start_with?(')')
      create_table_block.call(current_table)
      current_state = State::INSERT_RECORD
      check_point_block.call(current_table, dump_io.pos, bytesize, @binlog_pos, current_state)
    elsif m = /^PRIMARY KEY \((?<primary_keys>[^\)]+)\)/.match(line)
      current_table.primary_keys = m[:primary_keys].split(',').collect do |pk_str|
        pk_str[1..-2]
      end
    end
  end

  state_create_table_columns = Proc.new do
    start_pos = dump_io.pos
    line = readline_proc.call

    #  `author_type` varchar(255) COLLATE utf8_unicode_ci DEFAULT NULL,
    if line.start_with?("\`")
      column = {}

      # parse column line
      line = line[0..-2] if line.end_with?(',')
      colname_end_index = line.index('`', 1) - 1
      column[:column_name] = line[1..colname_end_index]
      line = line[colname_end_index + 3..-1]
      items = line.split
      column[:format_type_str] = format_type_str = items.shift
      pos = format_type_str.index('(')
      if pos
        ft = column[:format_type] = format_type_str[0..pos-1]
        if ft == 'decimal'
          precision, scale = format_type_str[pos+1..-2].split(',').collect{|v| v.to_i}
          column[:decimal_precision] = precision
          column[:decimal_scale] = scale
        else
          column[:format_size] = format_type_str[pos+1..-2].to_i
        end
      else
        column[:format_type] = format_type_str
      end
      while (item = items.shift) do
        case item
        when 'DEFAULT'
          value = items.shift
          value = value.start_with?('\'') ? value[1..-2] : value
          value = nil if value == 'NULL'
          column[:default] = value
        when 'NOT'
          if items[1] == 'NULL'
            items.shift
            column[:not_null] = true
          end
        when 'unsigned'
          column[:unsigned] = true
        else
          #ignore other options
        end
      end

      current_table.add_column(column)
    else
      current_state = State::CREATE_TABLE_CONSTRAINTS
      buffered_line = line
      state_create_table_constraints.call
    end
  end

  state_insert_record = Proc.new do
    line = readline_proc.call

    if line.start_with?('INSERT')
      buffered_line = line
      current_state = State::PARSING_INSERT_RECORD
    elsif line.start_with?('UNLOCK')
      current_state = State::CREATE_TABLE
      check_point_block.call(current_table, dump_io.pos, bytesize, @binlog_pos, current_state)
    end
  end

  state_parsing_insert_record = Proc.new do
    line = readline_proc.call

    values_set = InsertParser.new.parse(line)
    current_state = State::INSERT_RECORD

    if insert_record_block.call(current_table, values_set)
      check_point_block.call(current_table, dump_io.pos, bytesize, @binlog_pos, current_state)
    end
  end

  # Start reading file from top
  begin
    # resume(only when using dump file)
    if @option[:last_pos] && (@option[:last_pos].to_i != -1)
      dump_io.pos = @option[:last_pos].to_i
      current_state = @option[:state]
      substate = @option[:substate]
      current_table = @option[:mysql_table]
      bytesize = dump_io.pos
    end

    until dump_io.eof? do
      case current_state
      when State::START
        state_start.call
      when State::CREATE_TABLE
        state_create_table.call
      when State::CREATE_TABLE_COLUMNS
        state_create_table_columns.call
      when State::CREATE_TABLE_CONSTRAINTS
        state_create_table_constraints.call
      when State::INSERT_RECORD
        state_insert_record.call
      when State::PARSING_INSERT_RECORD
        state_parsing_insert_record.call
      end
    end
  end
  @binlog_pos
end