Class: Fluent::MysqlBinlogFlydataInput
Constant Summary
Fluent::MysqlBinlogFlydataInputPreference::CUSTOM_CONFIG_PARAMS
Instance Method Summary
collapse
included
Constructor Details
Returns a new instance of MysqlBinlogFlydataInput.
15
16
17
|
# File 'lib/flydata/fluent-plugins/in_mysql_binlog_flydata.rb', line 15
def initialize
super
end
|
Instance Method Details
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
# File 'lib/flydata/fluent-plugins/in_mysql_binlog_flydata.rb', line 22
def configure(conf)
super
unless File.exists?(@position_file)
raise "No position file(#{@position_file}). Initial synchronization is required before starting."
end
load_custom_conf
$log.info "mysql host:\"#{@host}\" username:\"#{@username}\" database:\"#{@database}\" tables:\"#{@tables}\""
@tables = @tables.split(/,\s*/)
@sync_fm = Flydata::FileUtil::SyncFileManager.new(nil) @record_handler = FlydataMysqlBinlogRecordHandler.new(
database: @database,
tables: @tables,
tag: @tag,
sync_fm: @sync_fm)
end
|
#event_listener(event) ⇒ Object
44
45
46
47
48
49
50
51
52
53
54
55
|
# File 'lib/flydata/fluent-plugins/in_mysql_binlog_flydata.rb', line 44
def event_listener(event)
begin
@record_handler.dispatch(event)
rescue Exception => e
position = File.open(@position_file) {|f| f.read }
$log.error "error occured while processing #{event.event_type} event at #{position}"
$log.error e.message
$log.error e.backtrace.join("\n")
raise unless e.kind_of?(StandardError)
end
end
|
#start ⇒ Object
38
39
40
41
42
|
# File 'lib/flydata/fluent-plugins/in_mysql_binlog_flydata.rb', line 38
def start
super
positions_path = @sync_fm.table_positions_dir_path
Dir.mkdir positions_path unless File.exists? positions_path
end
|