Class: Fluent::FiveRocksOutput

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_5rocks.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeFiveRocksOutput

Returns a new instance of FiveRocksOutput.



8
9
10
11
12
13
# File 'lib/fluent/plugin/out_5rocks.rb', line 8

def initialize
  super
  require 'net/http'
  require 'uri'
  require "time"
end

Instance Attribute Details

#field_mapObject (readonly)

Returns the value of attribute field_map.



6
7
8
# File 'lib/fluent/plugin/out_5rocks.rb', line 6

def field_map
  @field_map
end

#urlObject (readonly)

Returns the value of attribute url.



6
7
8
# File 'lib/fluent/plugin/out_5rocks.rb', line 6

def url
  @url
end

Instance Method Details

#configure(conf) ⇒ Object



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/fluent/plugin/out_5rocks.rb', line 15

def configure(conf)
  super
  @url = "#{@url_base}#{conf['app_id']}"
  @field_map = {
    'app_key' => conf['app_key'],
    'name' => conf['name'],
  }
  @field_map['category'] = conf['category'] if conf['category']
  conf.elements.select { |e|
    e.name == "field"
  }.each { |f|
    f.each_key { |k| @field_map[k] = f[k]}

    f.elements.select { |ee|
      ee.name == "values"
    }.each { |m|
      m.each_key{ |k| @field_map["values[#{k}]"] = m[k]}
    }
  }
end

#format(tag, time, record) ⇒ Object



44
45
46
# File 'lib/fluent/plugin/out_5rocks.rb', line 44

def format(tag, time, record)
  [tag, time, record].to_msgpack
end

#shutdownObject



40
41
42
# File 'lib/fluent/plugin/out_5rocks.rb', line 40

def shutdown
  super
end

#startObject



36
37
38
# File 'lib/fluent/plugin/out_5rocks.rb', line 36

def start
  super
end

#write(chunk) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/fluent/plugin/out_5rocks.rb', line 48

def write(chunk)
  ret = []
  chunk.msgpack_each do |tag, time, record|
    log.debug "tag: #{tag}, record: #{record}"

    params = @field_map.each_with_object({}) do |(k, v), p|
      if /^\$\(([^)]+)\)$/ =~ v
        p[k] = record[$1] # can be ::String, ::Numeric, etc.
      else
        p[k] = v ? v.gsub(/\$\(([^)]+)\)/) { record[$1] } : nil # ::String
      end
    end
    t = params["time"] || time
    t = Time.parse(t) if t.is_a?(::String)
    t = t.to_f * 1000 if t.is_a?(::Date) or t.is_a?(::Time)
    t = [t / 1000, t, t * 1000].min_by { |_t| (Time.now.to_f * 1000 - _t).abs } # to milliseconds
    t = t.to_i
    params["time"] = t

    log.debug "request parameters: #{params}"
    res = Net::HTTP.post_form(URI.parse(@url), params)
    log.debug "response code: #{res.code}"
    log.debug "response body: #{res.body}"
    
    ret << params
    raise "failed to insert into 5rocks" unless res.code == "201"
  end
  return ret
end