Module: Flydata::PluginSupport::SyncRecordEmittable

Included in:
QueryBasedSync::ResponseHandler, SourceMysql::PluginSupport::BinlogRecordHandler
Defined in:
lib/flydata/plugin_support/sync_record_emittable.rb

Constant Summary collapse

TABLE_NAME =

A Flydata JSON tag to specify a table name

:table_name
TYPE =
:type
SEQ =
:seq
RESPECT_ORDER =
:respect_order
SRC_POS =
:src_pos
TABLE_REV =
:table_rev
V =

FlyData record format version

:v

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#contextObject

required



14
15
16
# File 'lib/flydata/plugin_support/sync_record_emittable.rb', line 14

def context
  @context
end

Instance Method Details

#emit_sync_records(records, options) ⇒ Object

Public Interface: Emit sync records to fluent engine

"records" : A record or records for emitting
            Each record needs to be Hash
"options"
  type:     : (required) type (insert, update, delete)
  tag       : (optional) tag (default: @context.tag)
  timestamp : (optional) timestamp (default: current timestamp)
  src_pos   : (required) source position (used for sync:repair)
  table     : (optional) table name
  increment_table_rev : (optional) set true when incrementing table revision


27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/flydata/plugin_support/sync_record_emittable.rb', line 27

def emit_sync_records(records, options)
  return if records.nil? || records.empty? # skip
  records = [records] unless records.kind_of?(Array)

  # Check options
  tag = options[:tag] || @context.tag
  timestamp = options[:timestamp] || Time.now.to_i
  type = options[:type]
  raise "type option must be set" if type.to_s.empty?
  src_pos = options[:src_pos]
  raise "src_pos option must be set" if src_pos.to_s.empty?

  seq = nil
  if table = options[:table]
    table_rev = @context.table_revs[table]
    if options[:increment_table_rev]
      table_rev = @context.sync_fm.increment_table_rev(table, table_rev)
      @context.table_revs[table] = table_rev
    end
    seq = @context.sync_fm.get_table_position(table)
  end

  # Add common information to each record
  array = records.collect do |r|
    r[TYPE] = type
    r[RESPECT_ORDER] = true
    r[SRC_POS] = src_pos
    r[V] = FlydataCore::Record::V2

    if table
      seq = @context.sync_fm.increment_table_position(seq)
      r[SEQ] = seq
      r[TABLE_NAME] = table
      r[TABLE_REV] = table_rev
    end
    [timestamp, r]
  end
  Fluent::Engine.emit_array(tag, array)
  if table
    @context.sync_fm.save_table_position(table, seq)
    if options[:set_infinity_to_table_binlog_pos]
      @context.set_infinity_to_table_binlog_pos(table)
    end
  end
end