Class: Fluent::PgJsonOutput

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_pgjson.rb

Instance Method Summary collapse

Constructor Details

#initializePgJsonOutput

Returns a new instance of PgJsonOutput.



18
19
20
21
22
# File 'lib/fluent/plugin/out_pgjson.rb', line 18

def initialize
  super
  require 'pg'
  @conn = nil
end

Instance Method Details

#configure(conf) ⇒ Object



24
25
26
# File 'lib/fluent/plugin/out_pgjson.rb', line 24

def configure(conf)
  super
end

#format(tag, time, record) ⇒ Object



36
37
38
# File 'lib/fluent/plugin/out_pgjson.rb', line 36

def format(tag, time, record)
  [tag, time, record].to_msgpack
end

#shutdownObject



28
29
30
31
32
33
34
# File 'lib/fluent/plugin/out_pgjson.rb', line 28

def shutdown
  super

  if ! @conn.nil? and ! @conn.finished?
    @conn.close()
  end
end

#write(chunk) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/fluent/plugin/out_pgjson.rb', line 40

def write(chunk)
  init_connection
  @conn.exec("COPY #{@table} (#{@tag_col}, #{@time_col}, #{@record_col}) FROM STDIN WITH DELIMITER E'\\x01'")
  begin
    chunk.msgpack_each do |tag, time, record|
      @conn.put_copy_data "#{tag}\x01#{Time.at(time).to_s}\x01#{record_value(record)}\n"
    end
  rescue => err
    errmsg = "%s while copy data: %s" % [ err.class.name, err.message ]
    @conn.put_copy_end( errmsg )
    @conn.get_result
    raise
  else
    @conn.put_copy_end
    res = @conn.get_result
    raise res.result_error_message if res.result_status!=PG::PGRES_COMMAND_OK
  end
end