Class: Fluent::CratedbOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::CratedbOutput
- Includes:
- SetTimeKeyMixin
- Defined in:
- lib/fluent/plugin/out_cratedb.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Instance Method Details
#configure(conf) ⇒ Object
25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/fluent/plugin/out_cratedb.rb', line 25 def configure(conf) super if @column_names.nil? fail Fluent::ConfigError, 'column_names MUST specified, but missing' end @column_names = @column_names.split(',').collect(&:strip) @key_names = @key_names.nil? ? @column_names : @key_names.split(',').collect(&:strip) @servers = @hosts ? @hosts : ["#{@host}:#{@port}"] end |
#format(tag, time, record) ⇒ Object
49 50 51 |
# File 'lib/fluent/plugin/out_cratedb.rb', line 49 def format(tag, time, record) [tag, time, format_proc.call(tag, time, record)].to_msgpack end |
#shutdown ⇒ Object
45 46 47 |
# File 'lib/fluent/plugin/out_cratedb.rb', line 45 def shutdown super end |
#start ⇒ Object
38 39 40 41 42 43 |
# File 'lib/fluent/plugin/out_cratedb.rb', line 38 def start super require 'logger' opts = {:http_options => , :logger => Logger.new(STDERR)} @client = CrateClient.new(@servers, opts) end |
#write(chunk) ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/fluent/plugin/out_cratedb.rb', line 53 def write(chunk) values = [] chunk.msgpack_each do |tag, time, data| #data = format_proc.call(tag, time, data) values << data end sql = "INSERT INTO #{@table} (#{@column_names.join(',')}) VALUES (#{ @column_names.map { |key| '?' }.join(',') })" #log.info(sql) @client.execute(sql, nil, values, ) #digest = Digest::SHA1.hexdigest(data) #@client.blob_put(table_name, digest, data) end |