Class: Fluent::CombinerOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_combiner.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeCombinerOutput

Returns a new instance of CombinerOutput.



15
16
17
18
# File 'lib/fluent/plugin/out_combiner.rb', line 15

def initialize
  super
  require 'pathname'
end

Instance Attribute Details

#histObject

Returns the value of attribute hist.



12
13
14
# File 'lib/fluent/plugin/out_combiner.rb', line 12

def hist
  @hist
end

#last_checkedObject

Returns the value of attribute last_checked.



13
14
15
# File 'lib/fluent/plugin/out_combiner.rb', line 13

def last_checked
  @last_checked
end

#tickObject

Returns the value of attribute tick.



13
14
15
# File 'lib/fluent/plugin/out_combiner.rb', line 13

def tick
  @tick
end

Instance Method Details

#add_prefix(tag = "") ⇒ Object

Utils



120
121
122
123
# File 'lib/fluent/plugin/out_combiner.rb', line 120

def add_prefix(tag="")
  return @tag_prefix if tag.empty?
  return @tag_prefix_string + tag
end

#clearObject



115
116
117
# File 'lib/fluent/plugin/out_combiner.rb', line 115

def clear
  @hist = initialize_hist(@hist.keys.dup)
end

#configure(conf) ⇒ Object



20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/fluent/plugin/out_combiner.rb', line 20

def configure(conf)
  super

  @tick = @count_interval.to_i if @count_interval
  @tag_str = @tag + '.' if @tag
  @tag_prefix_string = @tag_prefix + '.' if @tag_prefix
  if @input_tag_remove_prefix
    @remove_prefix_string = @input_tag_remove_prefix + '.' 
    @remove_prefix_length = @remove_prefix_string.length
  end

  @hist = initialize_hist
end

#countup(tag, keys) ⇒ Object



107
108
109
110
111
112
113
# File 'lib/fluent/plugin/out_combiner.rb', line 107

def countup(tag, keys)
  if keys.is_a?(Array) 
    keys.each {|k| increment(tag, k)}
  elsif keys.is_a?(String)
    increment(tag, keys)
  end
end

#emit(tag, es, chain) ⇒ Object



71
72
73
74
75
76
77
78
79
# File 'lib/fluent/plugin/out_combiner.rb', line 71

def emit(tag, es, chain)

  es.each do |time, record|
    keys = record[@count_key]
    countup(tag, keys)
  end

  chain.next
end

#flushObject



46
47
48
49
# File 'lib/fluent/plugin/out_combiner.rb', line 46

def flush
  flushed, @hist = @hist, initialize_hist(@hist.keys.dup)
  generate_output(flushed)
end

#flush_emitObject



51
52
53
54
55
56
57
# File 'lib/fluent/plugin/out_combiner.rb', line 51

def flush_emit
  flushed = flush
  now = Fluent::Engine.now
  flushed.each do |tag, message|
    Fluent::Engine.emit(tag, now, message)
  end
end

#generate_output(data) ⇒ Object



59
60
61
62
63
64
65
66
67
68
69
# File 'lib/fluent/plugin/out_combiner.rb', line 59

def generate_output(data)
  output = {}
  data.each do |tag, hist|
    if @tag
      output[@tag] = hist
    else
      output[add_prefix(stripped_tag(tag))] = hist
    end
  end
  output
end

#increment(tag, key) ⇒ Object



94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/fluent/plugin/out_combiner.rb', line 94

def increment(tag, key)
  @hist[tag] ||= {:hist => {}, :sum => 0, :len => 0}
  if @hist[tag][:hist].key? key
    @hist[tag][:hist][key] += 1
    @hist[tag][:sum] += 1
  else
    @hist[tag][:hist][key] = 1
    @hist[tag][:sum] += 1
    @hist[tag][:len] += 1
  end
  @hist
end

#initialize_hist(tags = nil) ⇒ Object

Combiner’s main methods



83
84
85
86
87
88
89
90
91
# File 'lib/fluent/plugin/out_combiner.rb', line 83

def initialize_hist(tags=nil)
  hist = {}
  if tags
    tags.each do |tag|
      hist[tag] = {:hist => {}, :sum => 0, :len => 0}
    end
  end
  hist
end

#shutdownObject



40
41
42
43
44
# File 'lib/fluent/plugin/out_combiner.rb', line 40

def shutdown
  super
  @watcher.terminate
  @watcher.join
end

#startObject

Fluent::Output main methods



35
36
37
38
# File 'lib/fluent/plugin/out_combiner.rb', line 35

def start
  super
  start_watch
end

#stripped_tag(tag) ⇒ Object



125
126
127
128
129
130
# File 'lib/fluent/plugin/out_combiner.rb', line 125

def stripped_tag(tag)
  return tag unless @input_tag_remove_prefix
  return tag[@remove_prefix_length..-1] if tag.start_with?(@remove_prefix_string) && tag.length > @remove_prefix_length
  return "" if tag == @input_tag_remove_prefix
  return tag
end