Class: Fluent::Plugin::PgJsonOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_pgjson.rb

Constant Summary collapse

DEFAULT_BUFFER_TYPE =
"memory"

Instance Method Summary collapse

Constructor Details

#initializePgJsonOutput

Returns a new instance of PgJsonOutput.



34
35
36
37
# File 'lib/fluent/plugin/out_pgjson.rb', line 34

def initialize
  super
  @conn = nil
end

Instance Method Details

#configure(conf) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/fluent/plugin/out_pgjson.rb', line 39

def configure(conf)
  compat_parameters_convert(conf, :buffer)
  super
  unless @chunk_key_tag
    raise Fluent::ConfigError, "'tag' in chunk_keys is required."
  end
  @encoder = case @encoder
             when :yajl
               Yajl
             when :json
               JSON
             end
end

#format(tag, time, record) ⇒ Object



69
70
71
# File 'lib/fluent/plugin/out_pgjson.rb', line 69

def format(tag, time, record)
  [Time.at(time).strftime(@time_format), record].to_msgpack
end

#formatted_to_msgpack_binaryObject



61
62
63
# File 'lib/fluent/plugin/out_pgjson.rb', line 61

def formatted_to_msgpack_binary
  true
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


65
66
67
# File 'lib/fluent/plugin/out_pgjson.rb', line 65

def multi_workers_ready?
  true
end

#shutdownObject



53
54
55
56
57
58
59
# File 'lib/fluent/plugin/out_pgjson.rb', line 53

def shutdown
  if ! @conn.nil? and ! @conn.finished?
    @conn.close()
  end

  super
end

#write(chunk) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/fluent/plugin/out_pgjson.rb', line 73

def write(chunk)
  init_connection
  @conn.exec("COPY #{@table} (#{@tag_col}, #{@time_col}, #{@record_col}) FROM STDIN WITH DELIMITER E'\\x01'")
  begin
    tag = chunk..tag
    chunk.msgpack_each do |time, record|
      @conn.put_copy_data "#{tag}\x01#{time}\x01#{record_value(record)}\n"
    end
  rescue => err
    errmsg = "%s while copy data: %s" % [ err.class.name, err.message ]
    @conn.put_copy_end( errmsg )
    @conn.get_result
    raise
  else
    @conn.put_copy_end
    res = @conn.get_result
    raise res.result_error_message if res.result_status!=PG::PGRES_COMMAND_OK
  end
end