Class: Fluent::MysqlBinlogInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_mysql_binlog.rb

Defined Under Namespace

Classes: BinlogUtil

Instance Method Summary collapse

Constructor Details

#initializeMysqlBinlogInput

Returns a new instance of MysqlBinlogInput.



6
7
8
9
# File 'lib/fluent/plugin/in_mysql_binlog.rb', line 6

def initialize
  super
  require 'kodama'
end

Instance Method Details

#configure(conf) ⇒ Object



22
23
24
25
26
# File 'lib/fluent/plugin/in_mysql_binlog.rb', line 22

def configure(conf)
  super
  @listen_event ||= BinlogUtil::EVENT_TYPES.join(',')
  @listen_events = @listen_event.split(',').map {|i| i.strip }
end

#event_listener(event) ⇒ Object



51
52
53
# File 'lib/fluent/plugin/in_mysql_binlog.rb', line 51

def event_listener(event)
  Engine.emit(@tag, Engine.now, BinlogUtil.to_hash(event))
end

#mysql_urlObject



55
56
57
58
59
60
61
62
# File 'lib/fluent/plugin/in_mysql_binlog.rb', line 55

def mysql_url
  {
    host: @host,
    port: @port,
    username: @username,
    password: @password,
  }
end

#runObject



37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/fluent/plugin/in_mysql_binlog.rb', line 37

def run
  Kodama::Client.start(mysql_url) do |c|
    c.binlog_position_file = @position_file
    c.connection_retry_limit = @retry_limit
    c.connection_retry_wait = @retry_wait
    c.log_level = @log_level.to_sym
    c.gracefully_stop_on :QUIT, :INT
    @listen_events.each do |event_type|
      $log.trace { "registered binlog event listener '#{event_type}'" }
      c.send("on_#{event_type}", &method(:event_listener))
    end
  end
end

#shutdownObject



33
34
35
# File 'lib/fluent/plugin/in_mysql_binlog.rb', line 33

def shutdown
  Thread.kill(@thread)
end

#startObject



28
29
30
31
# File 'lib/fluent/plugin/in_mysql_binlog.rb', line 28

def start
  $log.debug "listening mysql replication on #{mysql_url}"
  @thread = Thread.new(&method(:run))
end