Class: Fluent::Plugin::PgJsonOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_pgjson.rb

Constant Summary collapse

DEFAULT_BUFFER_TYPE =
"memory"

Instance Method Summary collapse

Constructor Details

#initializePgJsonOutput

Returns a new instance of PgJsonOutput.



46
47
48
49
# File 'lib/fluent/plugin/out_pgjson.rb', line 46

def initialize
  super
  @conn = nil
end

Instance Method Details

#configure(conf) ⇒ Object



51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/fluent/plugin/out_pgjson.rb', line 51

def configure(conf)
  compat_parameters_convert(conf, :buffer)
  super
  unless @chunk_key_tag
    raise Fluent::ConfigError, "'tag' in chunk_keys is required."
  end
  @encoder = case @encoder
             when :yajl
               Yajl
             when :json
               JSON
             end
end

#format(tag, time, record) ⇒ Object



81
82
83
# File 'lib/fluent/plugin/out_pgjson.rb', line 81

def format(tag, time, record)
  [Time.at(time).strftime(@time_format), record].to_msgpack
end

#formatted_to_msgpack_binaryObject



73
74
75
# File 'lib/fluent/plugin/out_pgjson.rb', line 73

def formatted_to_msgpack_binary
  true
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


77
78
79
# File 'lib/fluent/plugin/out_pgjson.rb', line 77

def multi_workers_ready?
  true
end

#shutdownObject



65
66
67
68
69
70
71
# File 'lib/fluent/plugin/out_pgjson.rb', line 65

def shutdown
  if ! @conn.nil? and ! @conn.finished?
    @conn.close()
  end

  super
end

#write(chunk) ⇒ Object



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/fluent/plugin/out_pgjson.rb', line 85

def write(chunk)
  init_connection
  begin
    @conn.exec("COPY #{@table} (#{@tag_col}, #{@time_col}, #{@record_col}) FROM STDIN WITH DELIMITER E'\\x01'")
    tag = chunk..tag
    chunk.msgpack_each do |time, record|
      @conn.put_copy_data "#{tag}\x01#{time}\x01#{record_value(record)}\n"
    end
  rescue PG::UnableToSend
    @conn.close()
    @conn = nil
    raise
  rescue => err
    errmsg = "%s while copy data: %s" % [ err.class.name, err.message ]
    @conn.put_copy_end( errmsg )
    @conn.get_result
    @conn.close()
    @conn = nil
    raise
  else
    @conn.put_copy_end
    res = @conn.get_result
    if res.result_status!=PG::PGRES_COMMAND_OK
      @conn.close()
      @conn = nil
    end
    raise res.result_error_message if res.result_status!=PG::PGRES_COMMAND_OK
  end
end