Class: Fluent::MysqlBulkOutput

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_mysql_bulk.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeMysqlBulkOutput

Returns a new instance of MysqlBulkOutput.



21
22
23
24
# File 'lib/fluent/plugin/out_mysql_bulk.rb', line 21

def initialize
  super
  require 'mysql2-cs-bind'
end

Instance Attribute Details

#handlerObject

Returns the value of attribute handler.



19
20
21
# File 'lib/fluent/plugin/out_mysql_bulk.rb', line 19

def handler
  @handler
end

Instance Method Details

#clientObject



66
67
68
69
70
71
72
73
74
75
# File 'lib/fluent/plugin/out_mysql_bulk.rb', line 66

def client
  Mysql2::Client.new({
      :host => @host,
      :port => @port,
      :username => @username,
      :password => @password,
      :database => @database,
      :flags => Mysql2::Client::MULTI_STATEMENTS
    })
end

#configure(conf) ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/fluent/plugin/out_mysql_bulk.rb', line 26

def configure(conf)
  super

  if @column_names.nil?
    raise Fluent::ConfigError, "column_names MUST be specified, but missing"
  end

  if @on_duplicate_key_update
    if @on_duplicate_update_keys.nil?
      raise Fluent::ConfigError, "on_duplicate_key_update = true , on_duplicate_update_keys nil!"
    end
    @on_duplicate_update_keys = @on_duplicate_update_keys.split(',')

    @on_duplicate_key_update_sql = " ON DUPLICATE KEY UPDATE "
    updates = []
    @on_duplicate_update_keys.each{|update_column|
      updates.push(" #{update_column} = VALUES(#{update_column})")
    }
    @on_duplicate_key_update_sql += updates.join(',')
  end

  @column_names = @column_names.split(',')
  @key_names = @key_names.nil? ? @column_names : @key_names.split(',')
  @format_proc = Proc.new{|tag, time, record| @key_names.map{|k| record[k]}}

end

#format(tag, time, record) ⇒ Object



62
63
64
# File 'lib/fluent/plugin/out_mysql_bulk.rb', line 62

def format(tag, time, record)
  [tag, time, @format_proc.call(tag, time, record)].to_msgpack
end

#shutdownObject



58
59
60
# File 'lib/fluent/plugin/out_mysql_bulk.rb', line 58

def shutdown
  super
end

#startObject



53
54
55
56
# File 'lib/fluent/plugin/out_mysql_bulk.rb', line 53

def start
  @handler = client
  super
end

#write(chunk) ⇒ Object



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/fluent/plugin/out_mysql_bulk.rb', line 77

def write(chunk)
  values_templates = []
  values = Array.new
  chunk.msgpack_each { |tag, time, data|
    values_templates.push "(#{@column_names.map{|key| '?'}.join(',')})"
    values.concat(data)
  }
  sql = "INSERT INTO #{@table} (#{@column_names.join(',')}) VALUES #{values_templates.join(',')}"
  if @on_duplicate_key_update
    sql += @on_duplicate_key_update_sql
  end

  $log.info "bulk insert sql => [#{Mysql2::Client.pseudo_bind(sql, values)}]"
  @handler.xquery(sql, values)
end