Class: Fluent::Plugin::PostgresOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_postgres.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#handlerObject

Returns the value of attribute handler.



22
23
24
# File 'lib/fluent/plugin/out_postgres.rb', line 22

def handler
  @handler
end

Instance Method Details

#clientObject



65
66
67
68
69
70
71
# File 'lib/fluent/plugin/out_postgres.rb', line 65

def client
  PG::Connection.new({
    :host => @host, :port => @port,
    :user => @username, :password => @password,
    :dbname => @database
  })
end

#configure(conf) ⇒ Object

We don’t currently support mysql’s analogous json format



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/fluent/plugin/out_postgres.rb', line 25

def configure(conf)
  compat_parameters_convert(conf, :inject)
  super

  if @format == 'json'
    @format_proc = Proc.new{|tag, time, record| record.to_json}
  else
    @key_names = @key_names.split(/\s*,\s*/)
    @format_proc = Proc.new{|tag, time, record| @key_names.map{|k| record[k]}}
  end

  if @columns.nil? and @sql.nil?
    raise Fluent::ConfigError, "columns or sql MUST be specified, but missing"
  end
  if @columns and @sql
    raise Fluent::ConfigError, "both of columns and sql are specified, but specify one of them"
  end
end

#format(tag, time, record) ⇒ Object



52
53
54
55
# File 'lib/fluent/plugin/out_postgres.rb', line 52

def format(tag, time, record)
  record = inject_values_to_record(tag, time, record)
  [tag, time, @format_proc.call(tag, time, record)].to_msgpack
end

#formatted_to_msgpack_binary?Boolean

Returns:



61
62
63
# File 'lib/fluent/plugin/out_postgres.rb', line 61

def formatted_to_msgpack_binary?
  true
end

#multi_workers_ready?Boolean

Returns:



57
58
59
# File 'lib/fluent/plugin/out_postgres.rb', line 57

def multi_workers_ready?
  true
end

#shutdownObject



48
49
50
# File 'lib/fluent/plugin/out_postgres.rb', line 48

def shutdown
  super
end

#startObject



44
45
46
# File 'lib/fluent/plugin/out_postgres.rb', line 44

def start
  super
end

#write(chunk) ⇒ Object



73
74
75
76
77
78
79
80
# File 'lib/fluent/plugin/out_postgres.rb', line 73

def write(chunk)
  handler = self.client
  handler.prepare("write", @sql)
  chunk.msgpack_each { |tag, time, data|
    handler.exec_prepared("write", data)
  }
  handler.close
end