Class: Fluent::MysqlBinlogFlydataInput

Inherits:
MysqlBinlogInput
  • Object
show all
Includes:
MysqlBinlogFlydataInputPreference
Defined in:
lib/flydata/fluent-plugins/in_mysql_binlog_flydata.rb

Constant Summary

Constants included from MysqlBinlogFlydataInputPreference

Fluent::MysqlBinlogFlydataInputPreference::CUSTOM_CONFIG_PARAMS

Instance Method Summary collapse

Methods included from MysqlBinlogFlydataInputPreference

included

Constructor Details

#initializeMysqlBinlogFlydataInput

Returns a new instance of MysqlBinlogFlydataInput.



43
44
45
46
# File 'lib/flydata/fluent-plugins/in_mysql_binlog_flydata.rb', line 43

def initialize
  super
  install_custom_signal_handler
end

Instance Method Details

#configure(conf) ⇒ Object



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/flydata/fluent-plugins/in_mysql_binlog_flydata.rb', line 52

def configure(conf)
  super
  @binlog_position_file = Mysql::BinLogPositionFile.new(@position_file)
  unless @binlog_position_file.exists?
    raise "No position file(#{@position_file}). Initial synchronization is required before starting."
  end
  load_custom_conf
  $log.info "mysql host:\"#{@host}\" username:\"#{@username}\" database:\"#{@database}\" tables:\"#{@tables}\" tables_append_only:\"#{tables_append_only}\""
  @tables = @tables.split(/,\s*/)
  @omit_events = Hash.new
  @tables_append_only.split(/,\s*/).each do |table|
    @tables << table unless @tables.include?(table)
    @omit_events[table] = [:delete]
  end
  sync_fm = Flydata::FileUtil::SyncFileManager.new(nil) # Passing nil for data_entry as this class does not use methods which require data_entry

  @context = Mysql::Context.new(
    database: @database, tables: @tables,
    tag: @tag, sync_fm: sync_fm, omit_events: @omit_events
  )
  @record_dispatcher = Mysql::FlydataBinlogRecordDispatcher.new(@context)
end

#event_listener(event) ⇒ Object



124
125
126
127
128
129
130
131
# File 'lib/flydata/fluent-plugins/in_mysql_binlog_flydata.rb', line 124

def event_listener(event)
  @record_dispatcher.dispatch(event)
rescue Exception => e
  position = @binlog_position_file.read
  $log.error "error occured while processing #{event.event_type} event at #{position}\n#{e.message}\n#{$!.backtrace.join("\n")}"
  # Not reraising a StandardError because the underlying code can't handle an error well.
  raise unless e.kind_of?(StandardError)
end

#install_custom_signal_handlerObject

Hack: All that has been added here is ‘Fluent::Engine.shutdown_source`. This should be in fluentd’s supervisor#install_main_process_signal_handlers



157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/flydata/fluent-plugins/in_mysql_binlog_flydata.rb', line 157

def install_custom_signal_handler
    trap :USR1 do
      $log.debug "fluentd main process get SIGUSR1"
      $log.info "force flushing buffered events"
      #@log.reopen!

      # Creating new thread due to mutex can't lock
      # in main thread during trap context
      Thread.new {
        begin
          Fluent::Engine.shutdown_source
          Fluent::Engine.flush!
          $log.debug "flushing thread: flushed"
        rescue Exception => e
          $log.warn "flushing thread error: #{e}"
        end
      }.run
    end
end

#runObject



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/flydata/fluent-plugins/in_mysql_binlog_flydata.rb', line 95

def run
  start_kodama(mysql_url) do |c|
    c.binlog_position_file = @position_file
    c.connection_retry_limit = @retry_limit
    c.connection_retry_wait = @retry_wait
    c.log_level = @log_level.to_sym
    @listen_events.each do |event_type|
      $log.trace { "registered binlog event listener '#{event_type}'" }
      c.send("on_#{event_type}", &method(:event_listener))
    end
  end
rescue
  $log.error "unexpected error. exception: #{$!.class.to_s}, error: #{$!.to_s}\n#{$!.backtrace.join("\n")}"
  raise
rescue SignalException
  $log.debug "signal exception. exception: #{$!.class.to_s}, error: #{$!.to_s}"
  raise
rescue Exception
  $log.error "unexpected fatal error. exception: #{$!.class.to_s}, error: #{$!.to_s}\n#{$!.backtrace.join("\n")}"
  raise
end

#shutdownObject



133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/flydata/fluent-plugins/in_mysql_binlog_flydata.rb', line 133

def shutdown
  if @thread and @thread.alive?
    $log.info "Requesting stop Kodama"
    @kodama_client.stop_request
    if wait_till_safe_to_stop
      $log.info "Killing Kodama client"
      Thread.kill(@thread)
    else
      $log.error "Unable to stop Kodama"
    end
  end
end

#startObject



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/flydata/fluent-plugins/in_mysql_binlog_flydata.rb', line 75

def start
  super
  positions_path = @context.sync_fm.table_positions_dir_path
  Dir.mkdir positions_path unless File.exists? positions_path
rescue Binlog::Error
  if (/basic_string::_M_replace_aux/ === $!.to_s)
    # TODO Fix the root cause in mysql-replication-listener
    $log.error <<EOS
a mysql-replication-listener error.  This could have been caused by one of the following reasons.
- Failed on connect: Your host is blocked because of many connection errors; unblock with 'mysqladmin flush-hosts'
EOS
  else
    $log.error "unexpected mysql-replication-listener error. exception: #{$!.class.to_s}, error: #{$!.to_s}\n#{$!.backtrace.join("\n")}"
  end
  raise
rescue Exception
  $log.error "unexpected fatal error. exception: #{$!.class.to_s}, error: #{$!.to_s}\n#{$!.backtrace.join("\n")}"
  raise
end

#start_kodama(options, &block) ⇒ Object



117
118
119
120
121
122
# File 'lib/flydata/fluent-plugins/in_mysql_binlog_flydata.rb', line 117

def start_kodama(options, &block)
  @kodama_client = Kodama::Client.new(Kodama::Client.mysql_url(options))
  @kodama_client.logger = $log
  block.call(@kodama_client)
  @kodama_client.start
end

#wait_till_safe_to_stopObject



146
147
148
149
150
151
152
153
# File 'lib/flydata/fluent-plugins/in_mysql_binlog_flydata.rb', line 146

def wait_till_safe_to_stop
  retry_count = 5
  1.upto(retry_count) do |i|
    return true if @kodama_client.safe_to_stop?
    sleep 3
  end
  false
end