Class: Fluent::MysqlBulkOutput

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_mysql_bulk.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeMysqlBulkOutput

Returns a new instance of MysqlBulkOutput.



21
22
23
24
# File 'lib/fluent/plugin/out_mysql_bulk.rb', line 21

def initialize
  super
  require 'mysql2-cs-bind'
end

Instance Attribute Details

#handlerObject

Returns the value of attribute handler.



19
20
21
# File 'lib/fluent/plugin/out_mysql_bulk.rb', line 19

def handler
  @handler
end

Instance Method Details

#clientObject



75
76
77
78
79
80
81
82
83
84
# File 'lib/fluent/plugin/out_mysql_bulk.rb', line 75

def client
  Mysql2::Client.new(
      host: @host,
      port: @port,
      username: @username,
      password: @password,
      database: @database,
      flags: Mysql2::Client::MULTI_STATEMENTS
    )
end

#configure(conf) ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/fluent/plugin/out_mysql_bulk.rb', line 26

def configure(conf)
  super

  if @column_names.nil?
    fail Fluent::ConfigError, 'column_names MUST specified, but missing'
  end

  if @on_duplicate_key_update
    if @on_duplicate_update_keys.nil?
      fail Fluent::ConfigError, 'on_duplicate_key_update = true , on_duplicate_update_keys nil!'
    end
    @on_duplicate_update_keys = @on_duplicate_update_keys.split(',')

    @on_duplicate_key_update_sql = ' ON DUPLICATE KEY UPDATE '
    updates = []
    @on_duplicate_update_keys.each do |update_column|
      updates << "#{update_column} = VALUES(#{update_column})"
    end
    @on_duplicate_key_update_sql += updates.join(',')
  end

  @column_names = @column_names.split(',')
  @key_names = @key_names.nil? ? @column_names : @key_names.split(',')
end

#format(tag, time, record) ⇒ Object



71
72
73
# File 'lib/fluent/plugin/out_mysql_bulk.rb', line 71

def format(tag, time, record)
  [tag, time, format_proc.call(tag, time, record)].to_msgpack
end

#shutdownObject



67
68
69
# File 'lib/fluent/plugin/out_mysql_bulk.rb', line 67

def shutdown
  super
end

#startObject



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/fluent/plugin/out_mysql_bulk.rb', line 51

def start
  super
  result = client.xquery("SHOW COLUMNS FROM #{@table}")
  @max_lengths = []
  @column_names.each do |column|
    info = result.select { |x| x['Field'] == column }.first
    r = /(char|varchar)\(([\d]+)\)/
    begin
      max_length = info['Type'].scan(r)[0][1].to_i
    rescue
      max_length = nil
    end
    @max_lengths << max_length
  end
end

#write(chunk) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/fluent/plugin/out_mysql_bulk.rb', line 86

def write(chunk)
  @handler = client
  values = []
  values_template = "(#{ @column_names.map { |key| '?' }.join(',') })"
  chunk.msgpack_each do |tag, time, data|
    values << Mysql2::Client.pseudo_bind(values_template, data)
  end
  sql = "INSERT INTO #{@table} (#{@column_names.join(',')}) VALUES #{values.join(',')}"
  sql += @on_duplicate_key_update_sql if @on_duplicate_key_update

  $log.info "bulk insert values size => #{values.size}"
  @handler.xquery(sql)
  @handler.close
end