Class: Fluent::Plugin::PostgresBulkOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_postgres_bulk.rb

Constant Summary collapse

N_PARAMETER_MAX =

number of parameters must be between 0 and 65535

65535

Instance Method Summary collapse

Constructor Details

#initializePostgresBulkOutput

Returns a new instance of PostgresBulkOutput.



26
27
28
29
# File 'lib/fluent/plugin/out_postgres_bulk.rb', line 26

def initialize
  super
  require 'pg'
end

Instance Method Details

#clientObject



31
32
33
34
35
36
37
38
39
# File 'lib/fluent/plugin/out_postgres_bulk.rb', line 31

def client
  PG.connect(
    host: @host,
    port: @port,
    dbname: @database,
    user: @username,
    password: @password,
  )
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


41
42
43
# File 'lib/fluent/plugin/out_postgres_bulk.rb', line 41

def multi_workers_ready?
  true
end

#write(chunk) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/fluent/plugin/out_postgres_bulk.rb', line 45

def write(chunk)
  handler = client()
  values = build_values(chunk)
  max_slice = N_PARAMETER_MAX / @column_names.length * @column_names.length
  values.each_slice(max_slice) do |slice|
    place_holders = build_place_holders(slice)
    query = "INSERT INTO #{@table} (#{@column_names.join(',')}) VALUES #{place_holders}"
    t = Benchmark.realtime {
      handler.exec_params(query, slice)
    }
    log.info("(table: #{table}) inserted #{values.length / @column_names.length} records in #{(t * 1000).to_i} ms")
  end
ensure
  handler.close
end