Class: Fluent::MysqlBinlogFlydataInput
Constant Summary
Fluent::MysqlBinlogFlydataInputPreference::CUSTOM_CONFIG_PARAMS
Instance Method Summary
collapse
included
Constructor Details
Returns a new instance of MysqlBinlogFlydataInput.
45
46
47
48
|
# File 'lib/flydata/fluent-plugins/in_mysql_binlog_flydata.rb', line 45
def initialize
super
install_custom_signal_handler
end
|
Instance Method Details
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
|
# File 'lib/flydata/fluent-plugins/in_mysql_binlog_flydata.rb', line 57
def configure(conf)
super
@binlog_position_file = Mysql::BinLogPositionFile.new(@position_file)
unless @binlog_position_file.exists?
raise "No position file(#{@position_file}). Initial synchronization is required before starting."
end
load_custom_conf
$log.info "mysql host:\"#{@host}\" port:\"#{@port}\" username:\"#{@username}\" database:\"#{@database}\" tables:\"#{@tables}\" tables_append_only:\"#{tables_append_only}\""
$log.info "mysql client version: #{`mysql -V`}"
server_version = `echo 'select version();' | MYSQL_PWD="#{@password}" mysql -h #{@host} --port #{@port} -u #{@username} 2>/dev/null`
$log.info "mysql server version: #{server_version}"
@tables = @tables.split(/,\s*/)
@omit_events = Hash.new
@tables_append_only.split(/,\s*/).each do |table|
@tables << table unless @tables.include?(table)
@omit_events[table] = [:delete]
end
@sync_fm = Flydata::FileUtil::SyncFileManager.new(nil)
table_meta = Mysql::TableMeta.new(
mysql_url: mysql_url, database: @database, tables: @tables)
table_meta.update
@context = Mysql::Context.new(
database: @database, tables: @tables,
tag: @tag, sync_fm: @sync_fm, omit_events: @omit_events,
table_meta: table_meta,
)
@record_dispatcher = Mysql::FlydataBinlogRecordDispatcher.new(@context)
@idle_event_detector = IdleEventDetector.new(@initial_idle_interval, @continuous_idle_interval, @check_interval)
end
|
#event_listener(event) ⇒ Object
150
151
152
153
154
155
156
157
158
|
# File 'lib/flydata/fluent-plugins/in_mysql_binlog_flydata.rb', line 150
def event_listener(event)
@idle_event_detector.notify
@record_dispatcher.dispatch(event)
rescue Exception => e
position = @binlog_position_file.read
$log.error "error occured while processing #{event.event_type} event at #{position}\n#{e.message}\n#{$!.backtrace.join("\n")}"
raise unless e.kind_of?(StandardError)
end
|
#install_custom_signal_handler ⇒ Object
Hack: All that has been added here is ‘Fluent::Engine.shutdown_source`. This should be in fluentd’s supervisor#install_main_process_signal_handlers
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
|
# File 'lib/flydata/fluent-plugins/in_mysql_binlog_flydata.rb', line 186
def install_custom_signal_handler
trap :USR1 do
$log.debug "fluentd main process get SIGUSR1"
$log.info "force flushing buffered events"
Thread.new {
begin
Fluent::Engine.shutdown_source
Fluent::Engine.flush!
$log.debug "flushing thread: flushed"
rescue Exception => e
$log.warn "flushing thread error: #{e}"
end
}.run
end
end
|
#run ⇒ Object
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
|
# File 'lib/flydata/fluent-plugins/in_mysql_binlog_flydata.rb', line 121
def run
start_kodama(mysql_url) do |c|
c.binlog_position_file = @position_file
c.connection_retry_limit = @retry_limit
c.connection_retry_wait = @retry_wait
c.log_level = @log_level.to_sym
@listen_events.each do |event_type|
$log.trace { "registered binlog event listener '#{event_type}'" }
c.send("on_#{event_type}", &method(:event_listener))
end
end
rescue
$log.error "unexpected error. exception: #{$!.class.to_s}, error: #{$!.to_s}\n#{$!.backtrace.join("\n")}"
raise
rescue SignalException
$log.debug "signal exception. exception: #{$!.class.to_s}, error: #{$!.to_s}"
raise
rescue Exception
$log.error "unexpected fatal error. exception: #{$!.class.to_s}, error: #{$!.to_s}\n#{$!.backtrace.join("\n")}"
raise
end
|
#shutdown ⇒ Object
160
161
162
163
164
165
166
167
168
169
170
171
172
173
|
# File 'lib/flydata/fluent-plugins/in_mysql_binlog_flydata.rb', line 160
def shutdown
@idle_event_detector.stop
if @thread and @thread.alive?
$log.info "Requesting stop Kodama"
@kodama_client.stop_request
if wait_till_safe_to_stop
$log.info "Killing Kodama client"
Thread.kill(@thread)
else
$log.error "Unable to stop Kodama"
end
end
@sync_fm.close
end
|
#start ⇒ Object
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
# File 'lib/flydata/fluent-plugins/in_mysql_binlog_flydata.rb', line 90
def start
super
@idle_event_detector.start do |reason, timestamp|
case reason
when :event_not_coming
$log.warn "No binary log event since #{timestamp}"
when :event_still_not_coming
$log.warn "No binary log event since #{timestamp}"
when :event_arrived_finally
$log.info "Binary log event has come at #{timestamp}"
end
end
positions_path = @context.sync_fm.table_positions_dir_path
Dir.mkdir positions_path unless File.exists? positions_path
rescue Binlog::Error
if (/basic_string::_M_replace_aux/ === $!.to_s)
$log.error <<EOS
a mysql-replication-listener error. This could have been caused by one of the following reasons.
- Failed on connect: Your host is blocked because of many connection errors; unblock with 'mysqladmin flush-hosts'
EOS
else
$log.error "unexpected mysql-replication-listener error. exception: #{$!.class.to_s}, error: #{$!.to_s}\n#{$!.backtrace.join("\n")}"
end
raise
rescue Exception
$log.error "unexpected fatal error. exception: #{$!.class.to_s}, error: #{$!.to_s}\n#{$!.backtrace.join("\n")}"
raise
end
|
#start_kodama(options, &block) ⇒ Object
143
144
145
146
147
148
|
# File 'lib/flydata/fluent-plugins/in_mysql_binlog_flydata.rb', line 143
def start_kodama(options, &block)
@kodama_client = Kodama::Client.new(Kodama::Client.mysql_url(options))
@kodama_client.logger = $log
block.call(@kodama_client)
@kodama_client.start
end
|
#wait_till_safe_to_stop ⇒ Object
175
176
177
178
179
180
181
182
|
# File 'lib/flydata/fluent-plugins/in_mysql_binlog_flydata.rb', line 175
def wait_till_safe_to_stop
retry_count = 5
1.upto(retry_count) do |i|
return true if @kodama_client.safe_to_stop?
sleep 3
end
false
end
|