Class: Fluent::MysqlAppenderInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_mysql_appender.rb

Instance Method Summary collapse

Constructor Details

#initializeMysqlAppenderInput

Returns a new instance of MysqlAppenderInput.



10
11
12
13
# File 'lib/fluent/plugin/in_mysql_appender.rb', line 10

def initialize
  require 'mysql2'
  super
end

Instance Method Details

#configure(conf) ⇒ Object



29
30
31
32
33
34
35
36
37
38
# File 'lib/fluent/plugin/in_mysql_appender.rb', line 29

def configure(conf)
  super
  @interval = Config.time_value(@interval)

  if @tag.nil?
    raise Fluent::ConfigError, "mysql_appender: missing 'tag' parameter. Please add following line into config like 'tag replicator.mydatabase.mytable.${event}.${primary_key}'"
  end

  $log.info "adding mysql_appender worker. :tag=>#{tag} :query=>#{@query} :limit=>#{limit} :interval=>#{@interval} sec "
end

#format_tag(tag, param) ⇒ Object



86
87
88
89
90
91
92
# File 'lib/fluent/plugin/in_mysql_appender.rb', line 86

def format_tag(tag, param)
  pattern = {'${event}' => param[:event].to_s, '${primary_key}' => @primary_key}
  tag.gsub(/(\${[a-z_]+})/) do
    $log.warn "mysql_appender: missing placeholder. :tag=>#{tag} :placeholder=>#{$1}" unless pattern.include?($1)
    pattern[$1]
  end
end

#get_connectionObject



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/fluent/plugin/in_mysql_appender.rb', line 106

def get_connection
  begin
    return Mysql2::Client.new({
      :host => @host,
      :port => @port,
      :username => @username,
      :password => @password,
      :database => @database,
      :encoding => @encoding,
      :reconnect => true,
      :stream => true,
      :cache_rows => false
    })
  rescue Exception => e
    $log.warn "mysql_appender: #{e}"
    sleep @interval
    retry
  end
end

#pollObject



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/fluent/plugin/in_mysql_appender.rb', line 58

def poll
  con = get_connection()
  loop do
    rows_count = 0
    start_time = Time.now
    select_query = @query.gsub(/"/,'') + " where #{primary_key} > #{last_id} order by #{primary_key} asc limit #{limit}"
    rows, con = query(select_query, con)
    rows.each_with_index do |row, index|
      tag = format_tag(@tag, {:event => :insert})
      if @time_column.nil? then
          td_time = Engine.now
      else
          td_time = Time.parse(row[@time_column]).to_i
      end
      row.each {|k, v| row[k] = v.to_s if v.is_a?(Time) || v.is_a?(Date) || v.is_a?(BigDecimal)}
      router.emit(tag, td_time, row)
      rows_count += 1
      if index == rows.size - 1
        @last_id = row[@primary_key]
      end
    end
    con.close
    elapsed_time = sprintf("%0.02f", Time.now - start_time)
    $log.info "mysql_appender: finished execution :tag=>#{tag} :rows_count=>#{rows_count} :last_id=>#{last_id} :elapsed_time=>#{elapsed_time} sec"
    sleep @interval
  end
end

#query(query, con = nil) ⇒ Object



94
95
96
97
98
99
100
101
102
103
104
# File 'lib/fluent/plugin/in_mysql_appender.rb', line 94

def query(query, con = nil)
  begin
    con = con.nil? ? get_connection : con
    con = con.ping ? con : get_connection
    return con.query(query), con
  rescue Exception => e
    $log.warn "mysql_appender: #{e}"
    sleep @interval
    retry
  end
end

#runObject



48
49
50
51
52
53
54
55
56
# File 'lib/fluent/plugin/in_mysql_appender.rb', line 48

def run
  begin
    poll
  rescue StandardError => e
    $log.error "mysql_appender: failed to execute query."
    $log.error "error: #{e.message}"
    $log.error e.backtrace.join("\n")
  end
end

#shutdownObject



44
45
46
# File 'lib/fluent/plugin/in_mysql_appender.rb', line 44

def shutdown
  Thread.kill(@thread)
end

#startObject



40
41
42
# File 'lib/fluent/plugin/in_mysql_appender.rb', line 40

def start
  @thread = Thread.new(&method(:run))
end