Class: Fluent::Plugin::PostgresBulkOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::PostgresBulkOutput
- Defined in:
- lib/fluent/plugin/out_postgres_bulk.rb
Constant Summary collapse
- N_PARAMETER_MAX =
number of parameters must be between 0 and 65535
65535
Instance Method Summary collapse
- #client ⇒ Object
-
#initialize ⇒ PostgresBulkOutput
constructor
A new instance of PostgresBulkOutput.
- #multi_workers_ready? ⇒ Boolean
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ PostgresBulkOutput
Returns a new instance of PostgresBulkOutput.
26 27 28 29 |
# File 'lib/fluent/plugin/out_postgres_bulk.rb', line 26 def initialize super require 'pg' end |
Instance Method Details
#client ⇒ Object
31 32 33 34 35 36 37 38 39 |
# File 'lib/fluent/plugin/out_postgres_bulk.rb', line 31 def client PG.connect( host: @host, port: @port, dbname: @database, user: @username, password: @password, ) end |
#multi_workers_ready? ⇒ Boolean
41 42 43 |
# File 'lib/fluent/plugin/out_postgres_bulk.rb', line 41 def multi_workers_ready? true end |
#write(chunk) ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/fluent/plugin/out_postgres_bulk.rb', line 45 def write(chunk) handler = client() values = build_values(chunk) max_slice = N_PARAMETER_MAX / @column_names.length * @column_names.length values.each_slice(max_slice) do |slice| place_holders = build_place_holders(slice) query = "INSERT INTO #{@table} (#{@column_names.join(',')}) VALUES #{place_holders}" t = Benchmark.realtime { handler.exec_params(query, slice) } log.info("(table: #{table}) inserted #{values.length / @column_names.length} records in #{(t * 1000).to_i} ms") end ensure handler.close end |