Class: Fluent::MysqlBinlogFlydataInput

Inherits:
MysqlBinlogInput
  • Object
show all
Includes:
MysqlBinlogFlydataInputPreference
Defined in:
lib/flydata/fluent-plugins/in_mysql_binlog_flydata.rb

Constant Summary

Constants included from MysqlBinlogFlydataInputPreference

Fluent::MysqlBinlogFlydataInputPreference::CUSTOM_CONFIG_PARAMS

Instance Method Summary collapse

Methods included from MysqlBinlogFlydataInputPreference

included

Constructor Details

#initializeMysqlBinlogFlydataInput

Returns a new instance of MysqlBinlogFlydataInput.



15
16
17
# File 'lib/flydata/fluent-plugins/in_mysql_binlog_flydata.rb', line 15

def initialize
  super
end

Instance Method Details

#configure(conf) ⇒ Object



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/flydata/fluent-plugins/in_mysql_binlog_flydata.rb', line 22

def configure(conf)
  super
  unless File.exists?(@position_file)
    raise "No position file(#{@position_file}). Initial synchronization is required before starting."
  end
  load_custom_conf
  $log.info "mysql host:\"#{@host}\" username:\"#{@username}\" database:\"#{@database}\" tables:\"#{@tables}\""
  @tables = @tables.split(/,\s*/)
  @sync_fm = Flydata::FileUtil::SyncFileManager.new(nil) # Passing nil for data_entry as this class does not use methods which require data_entry
  @record_handler = FlydataMysqlBinlogRecordHandler.new(
                database: @database,
                tables: @tables,
                tag: @tag,
                sync_fm: @sync_fm)
end

#event_listener(event) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/flydata/fluent-plugins/in_mysql_binlog_flydata.rb', line 44

def event_listener(event)
  begin
    @record_handler.dispatch(event)
  rescue Exception => e
    position = File.open(@position_file) {|f| f.read }
    $log.error "error occured while processing #{event.event_type} event at #{position}"
    $log.error e.message
    $log.error e.backtrace.join("\n")
    # Not reraising a StandardError because the underlying code can't handle an error well.
    raise unless e.kind_of?(StandardError)
  end
end

#startObject



38
39
40
41
42
# File 'lib/flydata/fluent-plugins/in_mysql_binlog_flydata.rb', line 38

def start
  super
  positions_path = @sync_fm.table_positions_dir_path
  Dir.mkdir positions_path unless File.exists? positions_path
end