Class: LogStash::Codecs::Multiline

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/codecs/multiline.rb

Instance Method Summary collapse

Instance Method Details

#accept(listener) ⇒ Object



180
181
182
183
184
185
186
187
# File 'lib/logstash/codecs/multiline.rb', line 180

def accept(listener)
  # memoize references to listener that holds upstream state
  @previous_listener = @last_seen_listener || listener
  @last_seen_listener = listener
  decode(listener.data) do |event|
    what_based_listener.process_event(event)
  end
end

#auto_flush(listener = @last_seen_listener) ⇒ Object



224
225
226
227
228
229
230
# File 'lib/logstash/codecs/multiline.rb', line 224

def auto_flush(listener = @last_seen_listener)
  return if listener.nil?

  flush do |event|
    listener.process_event(event)
  end
end

#auto_flush_active?Boolean

Returns:

  • (Boolean)


286
287
288
# File 'lib/logstash/codecs/multiline.rb', line 286

def auto_flush_active?
  !@auto_flush_interval.nil?
end

#auto_flush_runnerObject



290
291
292
# File 'lib/logstash/codecs/multiline.rb', line 290

def auto_flush_runner
  @auto_flush_runner || AutoFlushUnset.new(nil, nil)
end

#buffer(text) ⇒ Object

def decode



202
203
204
205
# File 'lib/logstash/codecs/multiline.rb', line 202

def buffer(text)
  @buffer_bytes += text.bytesize
  @buffer.push(text)
end

#buffer_over_limits?Boolean

Returns:

  • (Boolean)


273
274
275
# File 'lib/logstash/codecs/multiline.rb', line 273

def buffer_over_limits?
  over_maximum_lines? || over_maximum_bytes?
end

#closeObject

def encode



282
283
284
# File 'lib/logstash/codecs/multiline.rb', line 282

def close
  auto_flush_runner.stop
end

#decode(text, &block) ⇒ Object



189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/logstash/codecs/multiline.rb', line 189

def decode(text, &block)
  text = @converter.convert(text)
  text.split("\n").each do |line|
    match = @grok.match(line)
    @logger.debug("Multiline", :pattern => @pattern, :text => line,
                  :match => !match.nil?, :negate => @negate)

    # Add negate option
    match = (match and !@negate) || (!match and @negate)
    @handler.call(line, match, &block)
  end
end

#do_next(text, matched, &block) ⇒ Object



253
254
255
256
257
# File 'lib/logstash/codecs/multiline.rb', line 253

def do_next(text, matched, &block)
  buffer(text)
  auto_flush_runner.start
  flush(&block) if !matched || buffer_over_limits?
end

#do_previous(text, matched, &block) ⇒ Object



259
260
261
262
263
# File 'lib/logstash/codecs/multiline.rb', line 259

def do_previous(text, matched, &block)
  flush(&block) if !matched || buffer_over_limits?
  auto_flush_runner.start
  buffer(text)
end

#doing_previous?Boolean

Returns:

  • (Boolean)


245
246
247
# File 'lib/logstash/codecs/multiline.rb', line 245

def doing_previous?
  @what == "previous"
end

#encode(event) ⇒ Object



277
278
279
280
# File 'lib/logstash/codecs/multiline.rb', line 277

def encode(event)
  # Nothing to do.
  @on_event.call(event, event)
end

#flush(&block) ⇒ Object



207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/logstash/codecs/multiline.rb', line 207

def flush(&block)
  if block_given? && @buffer.any?
    no_error = true
    events = merge_events
    begin
      yield events
    rescue ::Exception => e
      # need to rescue everything
      # likliest cause: backpressure or timeout by exception
      # can't really do anything but leave the data in the buffer for next time if there is one
      @logger.error("Multiline: flush downstream error", :exception => e)
      no_error = false
    end
    reset_buffer if no_error
  end
end

#merge_eventsObject



232
233
234
235
236
237
238
# File 'lib/logstash/codecs/multiline.rb', line 232

def merge_events
  event = LogStash::Event.new(LogStash::Event::TIMESTAMP => @time, "message" => @buffer.join(NL))
  event.tag @multiline_tag if @multiline_tag && @buffer.size > 1
  event.tag "multiline_codec_max_bytes_reached" if over_maximum_bytes?
  event.tag "multiline_codec_max_lines_reached" if over_maximum_lines?
  event
end

#over_maximum_bytes?Boolean

Returns:

  • (Boolean)


269
270
271
# File 'lib/logstash/codecs/multiline.rb', line 269

def over_maximum_bytes?
  @buffer_bytes >= @max_bytes
end

#over_maximum_lines?Boolean

Returns:

  • (Boolean)


265
266
267
# File 'lib/logstash/codecs/multiline.rb', line 265

def over_maximum_lines?
  @buffer.size > @max_lines
end

#registerObject



137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/logstash/codecs/multiline.rb', line 137

def register
  require "grok-pure" # rubygem 'jls-grok'
  require 'logstash/patterns/core'

  # Detect if we are running from a jarfile, pick the right path.
  patterns_path = []
  patterns_path += [LogStash::Patterns::Core.path]

  @grok = Grok.new

  @patterns_dir = patterns_path.to_a + @patterns_dir
  @patterns_dir.each do |path|
    if ::File.directory?(path)
      path = ::File.join(path, "*")
    end

    Dir.glob(path).each do |file|
      @logger.debug("Grok loading patterns from file", :path => file)
      @grok.add_patterns_from_file(file)
    end
  end

  @grok.compile(@pattern)
  @logger.trace("Registered multiline plugin", :type => @type, :config => @config)

  reset_buffer

  @handler = method("do_#{@what}".to_sym)

  @converter = LogStash::Util::Charset.new(@charset)
  @converter.logger = @logger
  if @auto_flush_interval
    # will start on first decode
    @auto_flush_runner = AutoFlush.new(self, @auto_flush_interval)
  end
end

#reset_bufferObject



240
241
242
243
# File 'lib/logstash/codecs/multiline.rb', line 240

def reset_buffer
  @buffer = []
  @buffer_bytes = 0
end

#use_mapper_auto_flushObject

def register



174
175
176
177
178
# File 'lib/logstash/codecs/multiline.rb', line 174

def use_mapper_auto_flush
  return unless auto_flush_active?
  @auto_flush_runner = AutoFlushUnset.new(nil, nil)
  @auto_flush_interval = @auto_flush_interval.to_f
end

#what_based_listenerObject



249
250
251
# File 'lib/logstash/codecs/multiline.rb', line 249

def what_based_listener
  doing_previous? ? @previous_listener : @last_seen_listener
end