Class: Fluent::MysqlBulkOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::MysqlBulkOutput
- Includes:
- SetTimeKeyMixin
- Defined in:
- lib/fluent/plugin/out_mysql_bulk.rb
Instance Attribute Summary collapse
-
#handler ⇒ Object
Returns the value of attribute handler.
Instance Method Summary collapse
- #client ⇒ Object
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
-
#initialize ⇒ MysqlBulkOutput
constructor
A new instance of MysqlBulkOutput.
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ MysqlBulkOutput
Returns a new instance of MysqlBulkOutput.
37 38 39 40 |
# File 'lib/fluent/plugin/out_mysql_bulk.rb', line 37 def initialize super require 'mysql2-cs-bind' end |
Instance Attribute Details
#handler ⇒ Object
Returns the value of attribute handler.
35 36 37 |
# File 'lib/fluent/plugin/out_mysql_bulk.rb', line 35 def handler @handler end |
Instance Method Details
#client ⇒ Object
97 98 99 100 101 102 103 104 105 106 |
# File 'lib/fluent/plugin/out_mysql_bulk.rb', line 97 def client Mysql2::Client.new( host: @host, port: @port, username: @username, password: @password, database: @database, flags: Mysql2::Client::MULTI_STATEMENTS ) end |
#configure(conf) ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/fluent/plugin/out_mysql_bulk.rb', line 47 def configure(conf) super if @column_names.nil? fail Fluent::ConfigError, 'column_names MUST specified, but missing' end if @on_duplicate_key_update if @on_duplicate_update_keys.nil? fail Fluent::ConfigError, 'on_duplicate_key_update = true , on_duplicate_update_keys nil!' end @on_duplicate_update_keys = @on_duplicate_update_keys.split(',') @on_duplicate_key_update_sql = ' ON DUPLICATE KEY UPDATE ' updates = [] @on_duplicate_update_keys.each do |update_column| updates << "#{update_column} = VALUES(#{update_column})" end @on_duplicate_key_update_sql += updates.join(',') end @column_names = @column_names.split(',').collect(&:strip) @key_names = @key_names.nil? ? @column_names : @key_names.split(',').collect(&:strip) @json_key_names = @json_key_names.split(',') if @json_key_names end |
#format(tag, time, record) ⇒ Object
93 94 95 |
# File 'lib/fluent/plugin/out_mysql_bulk.rb', line 93 def format(tag, time, record) [tag, time, format_proc.call(tag, time, record)].to_msgpack end |
#shutdown ⇒ Object
89 90 91 |
# File 'lib/fluent/plugin/out_mysql_bulk.rb', line 89 def shutdown super end |
#start ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/fluent/plugin/out_mysql_bulk.rb', line 73 def start super result = client.xquery("SHOW COLUMNS FROM #{@table}") @max_lengths = [] @column_names.each do |column| info = result.select { |x| x['Field'] == column }.first r = /(char|varchar)\(([\d]+)\)/ begin max_length = info['Type'].scan(r)[0][1].to_i rescue max_length = nil end @max_lengths << max_length end end |
#write(chunk) ⇒ Object
108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/fluent/plugin/out_mysql_bulk.rb', line 108 def write(chunk) @handler = client values = [] values_template = "(#{ @column_names.map { |key| '?' }.join(',') })" chunk.msgpack_each do |tag, time, data| values << Mysql2::Client.pseudo_bind(values_template, data) end sql = "INSERT INTO #{@table} (#{@column_names.join(',')}) VALUES #{values.join(',')}" sql += @on_duplicate_key_update_sql if @on_duplicate_key_update log.info "bulk insert values size (table: #{@table}) => #{values.size}" @handler.xquery(sql) @handler.close end |