Module: Fluent::FlydataSync

Included in:
MysqlBinlogFlydataInput
Defined in:
lib/flydata/fluent-plugins/flydata_plugin_ext/flydata_sync.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(base) ⇒ Object



9
10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/flydata/fluent-plugins/flydata_plugin_ext/flydata_sync.rb', line 9

def self.included(base)
  base.class_eval do
    include FlushSupport
    prepend TransactionSupport

    config_param :tables, :string
    config_param :tables_append_only, :string

    # binlog plugin
    config_param :tag, :string
    config_param :position_file, :string, default: 'position.log'
  end
end

Instance Method Details

#configure(conf) ⇒ Object



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/flydata/fluent-plugins/flydata_plugin_ext/flydata_sync.rb', line 23

def configure(conf)
  super

  @binlog_position_file = self.class::BINLOG_POSITION_FILE_CLASS.new(@position_file)
  unless @binlog_position_file.exists?
    raise "No position file(#{@binlog_position_file.path}). Initial synchronization is required before starting."
  end

  @sync_fm = Flydata::SyncFileManager.new(nil) # Passing nil for data_entry as this class does not use methods which require data_entry
  sent_position_file_path = @sync_fm.sent_source_pos_path(@position_file)
  @sent_position_file = self.class::BINLOG_POSITION_FILE_CLASS.new(sent_position_file_path)

  # Create positions dir
  positions_path = @sync_fm.table_positions_dir_path
  Dir.mkdir positions_path unless File.exists? positions_path

  load_custom_conf

  @tables = @tables.split(/(?:\s*,\s*|\s+)/)
  @omit_events = Hash.new
  @tables_append_only.split(/(?:\s*,\s*|\s+)/).each do |table|
    @tables << table unless @tables.include?(table)
    @omit_events[table] = [:delete, :truncate_table]
  end

  # Remove tables that do not have pos files
  new_tables = @sync_fm.get_new_table_list(@tables, "pos")
  @tables -= new_tables
  $log.info "Not watching these tables: #{new_tables.join(", ")}"
end