Class: Fluent::DbiOutput

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_dbi.rb

Instance Method Summary collapse

Constructor Details

#initializeDbiOutput

Returns a new instance of DbiOutput.



12
13
14
15
16
# File 'lib/fluent/plugin/out_dbi.rb', line 12

def initialize
  super

  require 'dbi'
end

Instance Method Details

#configure(conf) ⇒ Object



18
19
20
21
22
# File 'lib/fluent/plugin/out_dbi.rb', line 18

def configure(conf)
  super

  @keys = @keys.split(",")
end

#format(tag, time, record) ⇒ Object



24
25
26
# File 'lib/fluent/plugin/out_dbi.rb', line 24

def format(tag, time, record)
  [tag, time, record].to_msgpack
end

#write(chunk) ⇒ Object



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/fluent/plugin/out_dbi.rb', line 28

def write(chunk)
  begin
    dbh = DBI.connect(@dsn, @db_user, @db_pass)
    dbh['AutoCommit'] = false
    sth = dbh.prepare(@query)
    chunk.msgpack_each { |tag, time, record|
      record.key?('time') || record['time'] = time
      record.key('tag') || record['tag'] = tag
      values = []
      @keys.each { |key|
        values.push(record[key])
      }
      rows = sth.execute(*values)
    }
  rescue
    dbh.rollback if dbh
    raise
  else
    sth.finish
    dbh.commit
  ensure
    dbh.disconnect if dbh
  end
end