Class: LogStash::Codecs::Joinlines
- Inherits:
-
Base
- Object
- Base
- LogStash::Codecs::Joinlines
- Defined in:
- lib/logstash/codecs/joinlines.rb
Instance Method Summary collapse
- #accept(listener) ⇒ Object
- #auto_flush(listener = @last_seen_listener) ⇒ Object
- #auto_flush_active? ⇒ Boolean
- #auto_flush_runner ⇒ Object
-
#buffer(text) ⇒ Object
def decode.
- #buffer_over_limits? ⇒ Boolean
-
#close ⇒ Object
def encode.
- #decode(text, &block) ⇒ Object
- #do_next(text, matched, &block) ⇒ Object
- #do_previous(text, matched, &block) ⇒ Object
- #doing_previous?(what) ⇒ Boolean
- #encode(event) ⇒ Object
- #flush(&block) ⇒ Object
- #initialize_copy(source) ⇒ Object
-
#internal_decode(text, &block) ⇒ Object
private.
- #merge_events ⇒ Object
- #over_maximum_bytes? ⇒ Boolean
- #over_maximum_lines? ⇒ Boolean
- #register ⇒ Object
- #reset_buffer ⇒ Object
-
#use_mapper_auto_flush ⇒ Object
def register.
- #what_based_listener(what) ⇒ Object
- #zip_config ⇒ Object
Instance Method Details
#accept(listener) ⇒ Object
144 145 146 147 148 149 150 151 152 |
# File 'lib/logstash/codecs/joinlines.rb', line 144 def accept(listener) # memoize references to listener that holds upstream state @previous_listener = @last_seen_listener || listener @last_seen_listener = listener internal_decode(listener.data) do |event,what| what_based_listener(what).process_event(event) end end |
#auto_flush(listener = @last_seen_listener) ⇒ Object
226 227 228 229 230 231 232 |
# File 'lib/logstash/codecs/joinlines.rb', line 226 def auto_flush(listener = @last_seen_listener) return if listener.nil? flush do |event| listener.process_event(event) end end |
#auto_flush_active? ⇒ Boolean
288 289 290 |
# File 'lib/logstash/codecs/joinlines.rb', line 288 def auto_flush_active? !@auto_flush_interval.nil? end |
#auto_flush_runner ⇒ Object
292 293 294 |
# File 'lib/logstash/codecs/joinlines.rb', line 292 def auto_flush_runner @auto_flush_runner || AutoFlushUnset.new(nil, nil) end |
#buffer(text) ⇒ Object
def decode
204 205 206 207 |
# File 'lib/logstash/codecs/joinlines.rb', line 204 def buffer(text) @buffer_bytes += text.bytesize @buffer.push(text) end |
#buffer_over_limits? ⇒ Boolean
275 276 277 |
# File 'lib/logstash/codecs/joinlines.rb', line 275 def buffer_over_limits? over_maximum_lines? || over_maximum_bytes? end |
#close ⇒ Object
def encode
284 285 286 |
# File 'lib/logstash/codecs/joinlines.rb', line 284 def close auto_flush_runner.stop end |
#decode(text, &block) ⇒ Object
198 199 200 201 202 |
# File 'lib/logstash/codecs/joinlines.rb', line 198 def decode(text, &block) internal_decode(text) do |event,what| yield(event) end end |
#do_next(text, matched, &block) ⇒ Object
255 256 257 258 259 |
# File 'lib/logstash/codecs/joinlines.rb', line 255 def do_next(text, matched, &block) buffer(text) auto_flush_runner.start flush(&block) if !matched || buffer_over_limits? end |
#do_previous(text, matched, &block) ⇒ Object
261 262 263 264 265 |
# File 'lib/logstash/codecs/joinlines.rb', line 261 def do_previous(text, matched, &block) flush(&block) if !matched || buffer_over_limits? auto_flush_runner.start buffer(text) end |
#doing_previous?(what) ⇒ Boolean
247 248 249 |
# File 'lib/logstash/codecs/joinlines.rb', line 247 def doing_previous?(what) what != "next" end |
#encode(event) ⇒ Object
279 280 281 282 |
# File 'lib/logstash/codecs/joinlines.rb', line 279 def encode(event) # Nothing to do. @on_event.call(event, event) end |
#flush(&block) ⇒ Object
209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
# File 'lib/logstash/codecs/joinlines.rb', line 209 def flush(&block) if block_given? && @buffer.any? no_error = true events = merge_events begin yield events rescue ::Exception => e # need to rescue everything # likliest cause: backpressure or timeout by exception # can't really do anything but leave the data in the buffer for next time if there is one @logger.error("Joinlines: flush downstream error", :exception => e) no_error = false end reset_buffer if no_error end end |
#initialize_copy(source) ⇒ Object
296 297 298 299 |
# File 'lib/logstash/codecs/joinlines.rb', line 296 def initialize_copy(source) super register end |
#internal_decode(text, &block) ⇒ Object
private
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
# File 'lib/logstash/codecs/joinlines.rb', line 159 def internal_decode(text, &block) do_flush = false text = @converter.convert(text) text.split("\n").each do |line| matched = false zip_config.each do |pattern,what,negate,grok,handler| match = grok.match(line) @logger.debug("Joinlines", :pattern => pattern, :text => line, :match => (match != false), :negate => negate) # Add negate option match = (match and !negate) || (!match and negate) if match do_flush = (what == "next" and @matching != "next") matched = true @matching = what break end end if !matched do_flush = (@matching != "next") @matching = "" end if do_flush flush do |event| yield(event,@matching) end do_flush = false end auto_flush_runner.start buffer(line) end end |
#merge_events ⇒ Object
234 235 236 237 238 239 240 |
# File 'lib/logstash/codecs/joinlines.rb', line 234 def merge_events event = LogStash::Event.new(LogStash::Event::TIMESTAMP => @time, "message" => @buffer.join(NL)) event.tag @multiline_tag if !@multiline_tag.empty? && @buffer.size > 1 event.tag "joinlines_codec_max_bytes_reached" if over_maximum_bytes? event.tag "joinlines_codec_max_lines_reached" if over_maximum_lines? event end |
#over_maximum_bytes? ⇒ Boolean
271 272 273 |
# File 'lib/logstash/codecs/joinlines.rb', line 271 def over_maximum_bytes? @buffer_bytes >= @max_bytes end |
#over_maximum_lines? ⇒ Boolean
267 268 269 |
# File 'lib/logstash/codecs/joinlines.rb', line 267 def over_maximum_lines? @buffer.size > @max_lines end |
#register ⇒ Object
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/logstash/codecs/joinlines.rb', line 91 def register require "grok-pure" # rubygem 'jls-grok' require 'logstash/patterns/core' @matching = "" # Detect if we are running from a jarfile, pick the right path. patterns_path = [] patterns_path += [LogStash::Patterns::Core.path] @patterns_dir = patterns_path.to_a + @patterns_dir @groks = [] @handlers = [] @patterns.zip(@what).each do |pattern,what| grok = Grok.new @patterns_dir.each do |path| if ::File.directory?(path) path = ::File.join(path, "*") end Dir.glob(path).each do |file| @logger.debug("Grok loading patterns from file", :path => file) grok.add_patterns_from_file(file) end end grok.compile(pattern) handler = method("do_#{what}".to_sym) @groks.push(grok) @handlers.push(handler) end @logger.trace("Registered joinlines plugin", :type => @type, :config => @config) reset_buffer @converter = LogStash::Util::Charset.new(@charset) @converter.logger = @logger if @auto_flush_interval # will start on first decode @auto_flush_runner = AutoFlush.new(self, @auto_flush_interval) end end |
#reset_buffer ⇒ Object
242 243 244 245 |
# File 'lib/logstash/codecs/joinlines.rb', line 242 def reset_buffer @buffer = [] @buffer_bytes = 0 end |
#use_mapper_auto_flush ⇒ Object
def register
138 139 140 141 142 |
# File 'lib/logstash/codecs/joinlines.rb', line 138 def use_mapper_auto_flush return unless auto_flush_active? @auto_flush_runner = AutoFlushUnset.new(nil, nil) @auto_flush_interval = @auto_flush_interval.to_f end |
#what_based_listener(what) ⇒ Object
251 252 253 |
# File 'lib/logstash/codecs/joinlines.rb', line 251 def what_based_listener(what) doing_previous?(what) ? @previous_listener : @last_seen_listener end |
#zip_config ⇒ Object
154 155 156 |
# File 'lib/logstash/codecs/joinlines.rb', line 154 def zip_config @patterns.zip(@what, @negate, @groks, @handlers) end |