Class: Fluent::Plugin::TimescaleDB

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_timescaledb.rb

Instance Method Summary collapse

Instance Method Details

#closeObject



18
19
20
# File 'lib/fluent/plugin/out_timescaledb.rb', line 18

def close
  @conn.close if @conn and !@conn.finished?
end

#format(tag, time, record) ⇒ Object



33
34
35
# File 'lib/fluent/plugin/out_timescaledb.rb', line 33

def format(tag, time, record)
  [tag, time, record].to_msgpack
end

#formatted_to_msgpack_binaryObject



37
38
39
# File 'lib/fluent/plugin/out_timescaledb.rb', line 37

def formatted_to_msgpack_binary
  true
end

#startObject



12
13
14
15
16
# File 'lib/fluent/plugin/out_timescaledb.rb', line 12

def start
  super

  @conn = PG.connect(@db_conn_string)
end

#write(chunk) ⇒ Object



22
23
24
25
26
27
28
29
30
31
# File 'lib/fluent/plugin/out_timescaledb.rb', line 22

def write(chunk)
  reconnect_if_connection_bad!

  values = []
  chunk.msgpack_each do | tag, time, record |
    values << "('#{@conn.escape_string(format_time(time))}','#{@conn.escape_string(tag)}','#{@conn.escape_string(record.to_json)}'::jsonb)"
  end

  @conn.exec("INSERT INTO #{@conn.escape_identifier(@db_table_name)} (time, tag, record) VALUES #{values.join(',')}")
end