Class: Fluent::EventCounterOutput

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_eventcounter.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeEventCounterOutput



9
10
11
12
# File 'lib/fluent/plugin/out_eventcounter.rb', line 9

def initialize
  super
  require 'redis'
end

Instance Attribute Details

#countsObject

Returns the value of attribute counts.



32
33
34
# File 'lib/fluent/plugin/out_eventcounter.rb', line 32

def counts
  @counts
end

Instance Method Details

#configure(conf) ⇒ Object



34
35
36
37
# File 'lib/fluent/plugin/out_eventcounter.rb', line 34

def configure(conf)
  super
  @capture_extra_replace = Regexp.new(@capture_extra_replace) if @capture_extra_replace.length > 0
end

#format(tag, time, record) ⇒ Object



65
66
67
68
69
70
71
72
73
74
# File 'lib/fluent/plugin/out_eventcounter.rb', line 65

def format(tag, time, record)
  return '' unless record[@count_key]

  if @capture_extra_if && record[@capture_extra_if]
    extra = record[@capture_extra_if].gsub(@capture_extra_replace, '')
    [tag.gsub(@input_tag_exclude,""), [record[@count_key], extra].compact.join(':')].to_json + "\n"
  else
    [tag.gsub(@input_tag_exclude,""), record[@count_key]].to_json + "\n"
  end
end

#startObject



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/fluent/plugin/out_eventcounter.rb', line 39

def start
  super
  unless @emit_only
    @redis = begin
      if @redis_sentinels.length > 0
        sentinels = @redis_sentinels.map {|host| {host: host, port: @redis_port} }
        Redis.new(
          url: "redis://#{@redis_master_group_name}",
          sentinels: sentinels,
          password: @redis_password,
          thread_safe: true,
          role: :master
        )
      else
         Redis.new(
          host: @redis_host,
          port: @redis_port,
          password: @redis_password,
          thread_safe: true,
          db: @redis_db_number
        )
      end
    end
  end
end

#write(chunk) ⇒ Object



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/fluent/plugin/out_eventcounter.rb', line 76

def write(chunk)
  counts = Hash.new {|hash, key| hash[key] = Hash.new {|h,k| h[k] = 0 } }
  chunk.open do |io|
    items = io.read.split("\n")
    items.each do |item|
      key, event = JSON.parse(item)
      counts[key][event] += 1
    end
  end

  @redis.pipelined do
    counts.each do |tag,events|
      events.each do |event, c|
        redis_key = [@redis_output_key,tag].join(':')
        @redis.hincrby(redis_key, event, c.to_i)
      end
    end
  end unless @emit_only

  if @emit_only || @debug_emit
    counts.each do |tag, events|
      router.emit(@emit_to, Time.now, tag => events)
    end
  end

end