Class: Fluent::MysqlBulkOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::MysqlBulkOutput
- Defined in:
- lib/fluent/plugin/out_mysql_bulk.rb
Instance Attribute Summary collapse
-
#handler ⇒ Object
Returns the value of attribute handler.
Instance Method Summary collapse
- #client ⇒ Object
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
-
#initialize ⇒ MysqlBulkOutput
constructor
A new instance of MysqlBulkOutput.
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ MysqlBulkOutput
Returns a new instance of MysqlBulkOutput.
21 22 23 24 |
# File 'lib/fluent/plugin/out_mysql_bulk.rb', line 21 def initialize super require 'mysql2-cs-bind' end |
Instance Attribute Details
#handler ⇒ Object
Returns the value of attribute handler.
19 20 21 |
# File 'lib/fluent/plugin/out_mysql_bulk.rb', line 19 def handler @handler end |
Instance Method Details
#client ⇒ Object
66 67 68 69 70 71 72 73 74 75 |
# File 'lib/fluent/plugin/out_mysql_bulk.rb', line 66 def client Mysql2::Client.new({ :host => @host, :port => @port, :username => @username, :password => @password, :database => @database, :flags => Mysql2::Client::MULTI_STATEMENTS }) end |
#configure(conf) ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/fluent/plugin/out_mysql_bulk.rb', line 26 def configure(conf) super if @column_names.nil? raise Fluent::ConfigError, "column_names MUST be specified, but missing" end if @on_duplicate_key_update if @on_duplicate_update_keys.nil? raise Fluent::ConfigError, "on_duplicate_key_update = true , on_duplicate_update_keys nil!" end @on_duplicate_update_keys = @on_duplicate_update_keys.split(',') @on_duplicate_key_update_sql = " ON DUPLICATE KEY UPDATE " updates = [] @on_duplicate_update_keys.each{|update_column| updates.push(" #{update_column} = VALUES(#{update_column})") } @on_duplicate_key_update_sql += updates.join(',') end @column_names = @column_names.split(',') @key_names = @key_names.nil? ? @column_names : @key_names.split(',') @format_proc = Proc.new{|tag, time, record| @key_names.map{|k| record[k]}} end |
#format(tag, time, record) ⇒ Object
62 63 64 |
# File 'lib/fluent/plugin/out_mysql_bulk.rb', line 62 def format(tag, time, record) [tag, time, @format_proc.call(tag, time, record)].to_msgpack end |
#shutdown ⇒ Object
58 59 60 |
# File 'lib/fluent/plugin/out_mysql_bulk.rb', line 58 def shutdown super end |
#start ⇒ Object
53 54 55 56 |
# File 'lib/fluent/plugin/out_mysql_bulk.rb', line 53 def start @handler = client super end |
#write(chunk) ⇒ Object
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/fluent/plugin/out_mysql_bulk.rb', line 77 def write(chunk) values_templates = [] values = Array.new chunk.msgpack_each { |tag, time, data| values_templates.push "(#{@column_names.map{|key| '?'}.join(',')})" values.concat(data) } sql = "INSERT INTO #{@table} (#{@column_names.join(',')}) VALUES #{values_templates.join(',')}" if @on_duplicate_key_update sql += @on_duplicate_key_update_sql end $log.info "bulk insert sql => [#{Mysql2::Client.pseudo_bind(sql, values)}]" @handler.xquery(sql, values) end |