Class: LogStash::Codecs::Multiline

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/codecs/multiline.rb

Instance Method Summary collapse

Instance Method Details

#accept(listener) ⇒ Object



187
188
189
190
191
192
193
194
# File 'lib/logstash/codecs/multiline.rb', line 187

def accept(listener)
  # memoize references to listener that holds upstream state
  @previous_listener = @last_seen_listener || listener
  @last_seen_listener = listener
  decode(listener.data) do |event|
    what_based_listener.process_event(event)
  end
end

#auto_flush(listener = @last_seen_listener) ⇒ Object



231
232
233
234
235
236
237
# File 'lib/logstash/codecs/multiline.rb', line 231

def auto_flush(listener = @last_seen_listener)
  return if listener.nil?

  flush do |event|
    listener.process_event(event)
  end
end

#auto_flush_active?Boolean

Returns:

  • (Boolean)


293
294
295
# File 'lib/logstash/codecs/multiline.rb', line 293

def auto_flush_active?
  !@auto_flush_interval.nil?
end

#auto_flush_runnerObject



297
298
299
# File 'lib/logstash/codecs/multiline.rb', line 297

def auto_flush_runner
  @auto_flush_runner || AutoFlushUnset.new(nil, nil)
end

#buffer(text) ⇒ Object

def decode



209
210
211
212
# File 'lib/logstash/codecs/multiline.rb', line 209

def buffer(text)
  @buffer_bytes += text.bytesize
  @buffer.push(text)
end

#buffer_over_limits?Boolean

Returns:

  • (Boolean)


280
281
282
# File 'lib/logstash/codecs/multiline.rb', line 280

def buffer_over_limits?
  over_maximum_lines? || over_maximum_bytes?
end

#closeObject

def encode



289
290
291
# File 'lib/logstash/codecs/multiline.rb', line 289

def close
  auto_flush_runner.stop
end

#decode(text, &block) ⇒ Object



196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/logstash/codecs/multiline.rb', line 196

def decode(text, &block)
  text = @converter.convert(text)
  text.split("\n").each do |line|
    match = @grok.match(line)
    @logger.debug("Multiline", :pattern => @pattern, :text => line,
                  :match => (match != false), :negate => @negate)

    # Add negate option
    match = (match and !@negate) || (!match and @negate)
    @handler.call(line, match, &block)
  end
end

#do_next(text, matched, &block) ⇒ Object



260
261
262
263
264
# File 'lib/logstash/codecs/multiline.rb', line 260

def do_next(text, matched, &block)
  buffer(text)
  auto_flush_runner.start
  flush(&block) if !matched || buffer_over_limits?
end

#do_previous(text, matched, &block) ⇒ Object



266
267
268
269
270
# File 'lib/logstash/codecs/multiline.rb', line 266

def do_previous(text, matched, &block)
  flush(&block) if !matched || buffer_over_limits?
  auto_flush_runner.start
  buffer(text)
end

#doing_previous?Boolean

Returns:

  • (Boolean)


252
253
254
# File 'lib/logstash/codecs/multiline.rb', line 252

def doing_previous?
  @what == "previous"
end

#encode(event) ⇒ Object



284
285
286
287
# File 'lib/logstash/codecs/multiline.rb', line 284

def encode(event)
  # Nothing to do.
  @on_event.call(event, event)
end

#flush(&block) ⇒ Object



214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
# File 'lib/logstash/codecs/multiline.rb', line 214

def flush(&block)
  if block_given? && @buffer.any?
    no_error = true
    events = merge_events
    begin
      yield events
    rescue ::Exception => e
      # need to rescue everything
      # likliest cause: backpressure or timeout by exception
      # can't really do anything but leave the data in the buffer for next time if there is one
      @logger.error("Multiline: flush downstream error", :exception => e)
      no_error = false
    end
    reset_buffer if no_error
  end
end

#merge_eventsObject



239
240
241
242
243
244
245
# File 'lib/logstash/codecs/multiline.rb', line 239

def merge_events
  event = LogStash::Event.new(LogStash::Event::TIMESTAMP => @time, "message" => @buffer.join(NL))
  event.tag @multiline_tag if !@multiline_tag.empty? && @buffer.size > 1
  event.tag "multiline_codec_max_bytes_reached" if over_maximum_bytes?
  event.tag "multiline_codec_max_lines_reached" if over_maximum_lines?
  event
end

#over_maximum_bytes?Boolean

Returns:

  • (Boolean)


276
277
278
# File 'lib/logstash/codecs/multiline.rb', line 276

def over_maximum_bytes?
  @buffer_bytes >= @max_bytes
end

#over_maximum_lines?Boolean

Returns:

  • (Boolean)


272
273
274
# File 'lib/logstash/codecs/multiline.rb', line 272

def over_maximum_lines?
  @buffer.size > @max_lines
end

#registerObject



144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/logstash/codecs/multiline.rb', line 144

def register
  require "grok-pure" # rubygem 'jls-grok'
  require 'logstash/patterns/core'

  # Detect if we are running from a jarfile, pick the right path.
  patterns_path = []
  patterns_path += [LogStash::Patterns::Core.path]

  @grok = Grok.new

  @patterns_dir = patterns_path.to_a + @patterns_dir
  @patterns_dir.each do |path|
    if ::File.directory?(path)
      path = ::File.join(path, "*")
    end

    Dir.glob(path).each do |file|
      @logger.debug("Grok loading patterns from file", :path => file)
      @grok.add_patterns_from_file(file)
    end
  end

  @grok.compile(@pattern)
  @logger.trace("Registered multiline plugin", :type => @type, :config => @config)

  reset_buffer

  @handler = method("do_#{@what}".to_sym)

  @converter = LogStash::Util::Charset.new(@charset)
  @converter.logger = @logger
  if @auto_flush_interval
    # will start on first decode
    @auto_flush_runner = AutoFlush.new(self, @auto_flush_interval)
  end
end

#reset_bufferObject



247
248
249
250
# File 'lib/logstash/codecs/multiline.rb', line 247

def reset_buffer
  @buffer = []
  @buffer_bytes = 0
end

#use_mapper_auto_flushObject

def register



181
182
183
184
185
# File 'lib/logstash/codecs/multiline.rb', line 181

def use_mapper_auto_flush
  return unless auto_flush_active?
  @auto_flush_runner = AutoFlushUnset.new(nil, nil)
  @auto_flush_interval = @auto_flush_interval.to_f
end

#what_based_listenerObject



256
257
258
# File 'lib/logstash/codecs/multiline.rb', line 256

def what_based_listener
  doing_previous? ? @previous_listener : @last_seen_listener
end