Class: Fluent::EventCounterOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::EventCounterOutput
- Defined in:
- lib/fluent/plugin/out_eventcounter.rb
Instance Attribute Summary collapse
-
#counts ⇒ Object
Returns the value of attribute counts.
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
-
#initialize ⇒ EventCounterOutput
constructor
A new instance of EventCounterOutput.
- #start ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ EventCounterOutput
9 10 11 12 |
# File 'lib/fluent/plugin/out_eventcounter.rb', line 9 def initialize super require 'redis' end |
Instance Attribute Details
#counts ⇒ Object
Returns the value of attribute counts.
32 33 34 |
# File 'lib/fluent/plugin/out_eventcounter.rb', line 32 def counts @counts end |
Instance Method Details
#configure(conf) ⇒ Object
34 35 36 37 |
# File 'lib/fluent/plugin/out_eventcounter.rb', line 34 def configure(conf) super @capture_extra_replace = Regexp.new(@capture_extra_replace) if @capture_extra_replace.length > 0 end |
#format(tag, time, record) ⇒ Object
65 66 67 68 69 70 71 72 73 74 |
# File 'lib/fluent/plugin/out_eventcounter.rb', line 65 def format(tag, time, record) return '' unless record[@count_key] if @capture_extra_if && record[@capture_extra_if] extra = record[@capture_extra_if].gsub(@capture_extra_replace, '') [tag.gsub(@input_tag_exclude,""), [record[@count_key], extra].compact.join(':')].to_json + "\n" else [tag.gsub(@input_tag_exclude,""), record[@count_key]].to_json + "\n" end end |
#start ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/fluent/plugin/out_eventcounter.rb', line 39 def start super unless @emit_only @redis = begin if @redis_sentinels.length > 0 sentinels = @redis_sentinels.map {|host| {host: host, port: @redis_port} } Redis.new( url: "redis://#{@redis_master_group_name}", sentinels: sentinels, password: @redis_password, thread_safe: true, role: :master ) else Redis.new( host: @redis_host, port: @redis_port, password: @redis_password, thread_safe: true, db: @redis_db_number ) end end end end |
#write(chunk) ⇒ Object
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/fluent/plugin/out_eventcounter.rb', line 76 def write(chunk) counts = Hash.new {|hash, key| hash[key] = Hash.new {|h,k| h[k] = 0 } } chunk.open do |io| items = io.read.split("\n") items.each do |item| key, event = JSON.parse(item) counts[key][event] += 1 end end @redis.pipelined do counts.each do |tag,events| events.each do |event, c| redis_key = [@redis_output_key,tag].join(':') @redis.hincrby(redis_key, event, c.to_i) end end end unless @emit_only if @emit_only || @debug_emit counts.each do |tag, events| router.emit(@emit_to, Time.now, tag => events) end end end |