Class: Fluent::MysqlAppenderInput
- Inherits:
-
Input
- Object
- Input
- Fluent::MysqlAppenderInput
- Defined in:
- lib/fluent/plugin/in_mysql_appender.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format_tag(tag, param) ⇒ Object
- #get_connection ⇒ Object
-
#initialize ⇒ MysqlAppenderInput
constructor
A new instance of MysqlAppenderInput.
- #poll ⇒ Object
- #query(query, con = nil) ⇒ Object
- #run ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize ⇒ MysqlAppenderInput
Returns a new instance of MysqlAppenderInput.
10 11 12 13 |
# File 'lib/fluent/plugin/in_mysql_appender.rb', line 10 def initialize require 'mysql2' super end |
Instance Method Details
#configure(conf) ⇒ Object
29 30 31 32 33 34 35 36 37 38 |
# File 'lib/fluent/plugin/in_mysql_appender.rb', line 29 def configure(conf) super @interval = Config.time_value(@interval) if @tag.nil? raise Fluent::ConfigError, "mysql_appender: missing 'tag' parameter. Please add following line into config like 'tag replicator.mydatabase.mytable.${event}.${primary_key}'" end $log.info "adding mysql_appender worker. :tag=>#{tag} :query=>#{@query} :limit=>#{limit} :interval=>#{@interval} sec " end |
#format_tag(tag, param) ⇒ Object
86 87 88 89 90 91 92 |
# File 'lib/fluent/plugin/in_mysql_appender.rb', line 86 def format_tag(tag, param) pattern = {'${event}' => param[:event].to_s, '${primary_key}' => @primary_key} tag.gsub(/(\${[a-z_]+})/) do $log.warn "mysql_appender: missing placeholder. :tag=>#{tag} :placeholder=>#{$1}" unless pattern.include?($1) pattern[$1] end end |
#get_connection ⇒ Object
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/fluent/plugin/in_mysql_appender.rb', line 106 def get_connection begin return Mysql2::Client.new({ :host => @host, :port => @port, :username => @username, :password => @password, :database => @database, :encoding => @encoding, :reconnect => true, :stream => true, :cache_rows => false }) rescue Exception => e $log.warn "mysql_appender: #{e}" sleep @interval retry end end |
#poll ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/fluent/plugin/in_mysql_appender.rb', line 58 def poll con = get_connection() loop do rows_count = 0 start_time = Time.now select_query = @query.gsub(/"/,'') + " where #{primary_key} > #{last_id} order by #{primary_key} asc limit #{limit}" rows, con = query(select_query, con) rows.each_with_index do |row, index| tag = format_tag(@tag, {:event => :insert}) if @time_column.nil? then td_time = Engine.now else td_time = Time.parse(row[@time_column]).to_i end row.each {|k, v| row[k] = v.to_s if v.is_a?(Time) || v.is_a?(Date) || v.is_a?(BigDecimal)} router.emit(tag, td_time, row) rows_count += 1 if index == rows.size - 1 @last_id = row[@primary_key] end end con.close elapsed_time = sprintf("%0.02f", Time.now - start_time) $log.info "mysql_appender: finished execution :tag=>#{tag} :rows_count=>#{rows_count} :last_id=>#{last_id} :elapsed_time=>#{elapsed_time} sec" sleep @interval end end |
#query(query, con = nil) ⇒ Object
94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/fluent/plugin/in_mysql_appender.rb', line 94 def query(query, con = nil) begin con = con.nil? ? get_connection : con con = con.ping ? con : get_connection return con.query(query), con rescue Exception => e $log.warn "mysql_appender: #{e}" sleep @interval retry end end |
#run ⇒ Object
48 49 50 51 52 53 54 55 56 |
# File 'lib/fluent/plugin/in_mysql_appender.rb', line 48 def run begin poll rescue StandardError => e $log.error "mysql_appender: failed to execute query." $log.error "error: #{e.message}" $log.error e.backtrace.join("\n") end end |
#shutdown ⇒ Object
44 45 46 |
# File 'lib/fluent/plugin/in_mysql_appender.rb', line 44 def shutdown Thread.kill(@thread) end |
#start ⇒ Object
40 41 42 |
# File 'lib/fluent/plugin/in_mysql_appender.rb', line 40 def start @thread = Thread.new(&method(:run)) end |