Class: Fluent::AnomalyDetectOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::AnomalyDetectOutput
- Defined in:
- lib/fluent/plugin/out_anomalydetect.rb
Instance Attribute Summary collapse
-
#outlier ⇒ Object
Returns the value of attribute outlier.
-
#outlier_buf ⇒ Object
Returns the value of attribute outlier_buf.
-
#record_count ⇒ Object
Returns the value of attribute record_count.
-
#records ⇒ Object
Returns the value of attribute records.
-
#score ⇒ Object
Returns the value of attribute score.
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #emit(tag, es, chain) ⇒ Object
- #flush ⇒ Object
- #flush_emit(step) ⇒ Object
- #init_records ⇒ Object
- #load_from_file ⇒ Object
- #push_records(records) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #start_watch ⇒ Object
- #store_to_file ⇒ Object
- #tick_time(time) ⇒ Object
- #watch ⇒ Object
Instance Attribute Details
#outlier ⇒ Object
Returns the value of attribute outlier.
19 20 21 |
# File 'lib/fluent/plugin/out_anomalydetect.rb', line 19 def outlier @outlier end |
#outlier_buf ⇒ Object
Returns the value of attribute outlier_buf.
23 24 25 |
# File 'lib/fluent/plugin/out_anomalydetect.rb', line 23 def outlier_buf @outlier_buf end |
#record_count ⇒ Object
Returns the value of attribute record_count.
21 22 23 |
# File 'lib/fluent/plugin/out_anomalydetect.rb', line 21 def record_count @record_count end |
#records ⇒ Object
Returns the value of attribute records.
25 26 27 |
# File 'lib/fluent/plugin/out_anomalydetect.rb', line 25 def records @records end |
#score ⇒ Object
Returns the value of attribute score.
20 21 22 |
# File 'lib/fluent/plugin/out_anomalydetect.rb', line 20 def score @score end |
Instance Method Details
#configure(conf) ⇒ Object
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/fluent/plugin/out_anomalydetect.rb', line 27 def configure (conf) super unless 0 < @outlier_discount and @outlier_discount < 1 raise Fluent::ConfigError, "discount ratio should be between (0, 1)" end unless 0 < @score_discount and @score_discount < 1 raise Fluent::ConfigError, "discount ratio should be between (0, 1)" end if @outlier_term < 1 raise Fluent::ConfigError, "outlier term should be greater than 0" end if @score_term < 1 raise Fluent::ConfigError, "score term should be greater than 0" end if @smooth_term < 1 raise Fluent::ConfigError, "smooth term should be greater than 0" end if @tick < 1 raise Fluent::ConfigError, "tick timer should be greater than 1 sec" end if @store_file f = Pathname.new(@store_file) if (f.exist? && !f.writable_real?) || (!f.exist? && !f.parent.writable_real?) raise Fluent::ConfigError, "#{@store_file} is not writable" end end @outlier_buf = [] @outlier = ChangeFinder.new(@outlier_term, @outlier_discount) @score = ChangeFinder.new(@score_term, @score_discount) @mutex = Mutex.new @record_count = @target.nil? end |
#emit(tag, es, chain) ⇒ Object
196 197 198 199 200 201 202 203 |
# File 'lib/fluent/plugin/out_anomalydetect.rb', line 196 def emit(tag, es, chain) records = es.map { |time, record| record } push_records records chain.next rescue => e $log.warn "anomalydetect: #{e.class} #{e.} #{e.backtrace.first}" end |
#flush ⇒ Object
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
# File 'lib/fluent/plugin/out_anomalydetect.rb', line 159 def flush flushed, @records = @records, init_records val = if @record_count flushed.size else filtered = flushed.map {|record| record[@target] }.compact return nil if filtered.empty? filtered.inject(:+).to_f / filtered.size end outlier = @outlier.next(val) @outlier_buf.push outlier @outlier_buf.shift if @outlier_buf.size > @smooth_term outlier_avg = @outlier_buf.empty? ? 0.0 : @outlier_buf.inject(:+).to_f / @outlier_buf.size score = @score.next(outlier_avg) $log.debug "out_anomalydetect:#{Thread.current.object_id} flushed:#{flushed} val:#{val} outlier:#{outlier} outlier_buf:#{@outlier_buf} score:#{score}" if @threshold < 0 or (@threshold >= 0 and score > @threshold) {"outlier" => outlier, "score" => score, "target" => val} else nil end end |
#flush_emit(step) ⇒ Object
152 153 154 155 156 157 |
# File 'lib/fluent/plugin/out_anomalydetect.rb', line 152 def flush_emit(step) output = flush if output Fluent::Engine.emit(@tag, Fluent::Engine.now, output) end end |
#init_records ⇒ Object
148 149 150 |
# File 'lib/fluent/plugin/out_anomalydetect.rb', line 148 def init_records @records = [] end |
#load_from_file ⇒ Object
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/fluent/plugin/out_anomalydetect.rb', line 82 def load_from_file return unless @store_file f = Pathname.new(@store_file) return unless f.exist? begin f.open('rb') do |f| stored = Marshal.load(f) if (( stored[:outlier_term] == @outlier_term ) && ( stored[:outlier_discount] == @outlier_discount ) && ( stored[:score_term] == @score_term ) && ( stored[:score_discount] == @score_discount ) && ( stored[:smooth_term] == @smooth_term )) then @outlier = stored[:outlier] @outlier_buf = stored[:outlier_buf] @score = stored[:score] else $log.warn "anomalydetect: configuration param was changed. ignore stored data" end end rescue => e $log.warn "anomalydetect: Can't load store_file #{e}" end end |
#push_records(records) ⇒ Object
190 191 192 193 194 |
# File 'lib/fluent/plugin/out_anomalydetect.rb', line 190 def push_records(records) @mutex.synchronize do @records.concat(records) end end |
#shutdown ⇒ Object
71 72 73 74 75 76 77 78 79 80 |
# File 'lib/fluent/plugin/out_anomalydetect.rb', line 71 def shutdown super if @watcher @watcher.terminate @watcher.join end store_to_file rescue => e $log.warn "anomalydetect: #{e.class} #{e.} #{e.backtrace.first}" end |
#start ⇒ Object
62 63 64 65 66 67 68 69 |
# File 'lib/fluent/plugin/out_anomalydetect.rb', line 62 def start super load_from_file init_records start_watch rescue => e $log.warn "anomalydetect: #{e.class} #{e.} #{e.backtrace.first}" end |
#start_watch ⇒ Object
128 129 130 |
# File 'lib/fluent/plugin/out_anomalydetect.rb', line 128 def start_watch @watcher = Thread.new(&method(:watch)) end |
#store_to_file ⇒ Object
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/fluent/plugin/out_anomalydetect.rb', line 108 def store_to_file return unless @store_file begin Pathname.new(@store_file).open('wb') do |f| Marshal.dump({ :outlier => @outlier, :outlier_buf => @outlier_buf, :score => @score, :outlier_term => @outlier_term, :outlier_discount => @outlier_discount, :score_term => @score_term, :score_discount => @score_discount, :smooth_term => @smooth_term, }, f) end rescue => e $log.warn "anomalydetect: Can't write store_file #{e}" end end |
#tick_time(time) ⇒ Object
186 187 188 |
# File 'lib/fluent/plugin/out_anomalydetect.rb', line 186 def tick_time(time) (time - time % @tick).to_s end |
#watch ⇒ Object
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/fluent/plugin/out_anomalydetect.rb', line 132 def watch @last_checked = Fluent::Engine.now loop do begin sleep 0.5 now = Fluent::Engine.now if now - @last_checked >= @tick flush_emit(now - @last_checked) @last_checked = now end rescue => e $log.warn "anomalydetect: #{e.class} #{e.} #{e.backtrace.first}" end end end |