Class: LogStash::Codecs::Joinlines

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/codecs/joinlines.rb

Instance Method Summary collapse

Instance Method Details

#accept(listener) ⇒ Object



144
145
146
147
148
149
150
151
152
# File 'lib/logstash/codecs/joinlines.rb', line 144

def accept(listener)
  # memoize references to listener that holds upstream state
  @previous_listener = @last_seen_listener || listener
  @last_seen_listener = listener

  internal_decode(listener.data) do |event,what|
    what_based_listener(what).process_event(event)
  end
end

#auto_flush(listener = @last_seen_listener) ⇒ Object



226
227
228
229
230
231
232
# File 'lib/logstash/codecs/joinlines.rb', line 226

def auto_flush(listener = @last_seen_listener)
  return if listener.nil?

  flush do |event|
    listener.process_event(event)
  end
end

#auto_flush_active?Boolean

Returns:

  • (Boolean)


288
289
290
# File 'lib/logstash/codecs/joinlines.rb', line 288

def auto_flush_active?
  !@auto_flush_interval.nil?
end

#auto_flush_runnerObject



292
293
294
# File 'lib/logstash/codecs/joinlines.rb', line 292

def auto_flush_runner
  @auto_flush_runner || AutoFlushUnset.new(nil, nil)
end

#buffer(text) ⇒ Object

def decode



204
205
206
207
# File 'lib/logstash/codecs/joinlines.rb', line 204

def buffer(text)
  @buffer_bytes += text.bytesize
  @buffer.push(text)
end

#buffer_over_limits?Boolean

Returns:

  • (Boolean)


275
276
277
# File 'lib/logstash/codecs/joinlines.rb', line 275

def buffer_over_limits?
  over_maximum_lines? || over_maximum_bytes?
end

#closeObject

def encode



284
285
286
# File 'lib/logstash/codecs/joinlines.rb', line 284

def close
  auto_flush_runner.stop
end

#decode(text, &block) ⇒ Object



198
199
200
201
202
# File 'lib/logstash/codecs/joinlines.rb', line 198

def decode(text, &block)
  internal_decode(text) do |event,what|
    yield(event)
  end
end

#do_next(text, matched, &block) ⇒ Object



255
256
257
258
259
# File 'lib/logstash/codecs/joinlines.rb', line 255

def do_next(text, matched, &block)
  buffer(text)
  auto_flush_runner.start
  flush(&block) if !matched || buffer_over_limits?
end

#do_previous(text, matched, &block) ⇒ Object



261
262
263
264
265
# File 'lib/logstash/codecs/joinlines.rb', line 261

def do_previous(text, matched, &block)
  flush(&block) if !matched || buffer_over_limits?
  auto_flush_runner.start
  buffer(text)
end

#doing_previous?(what) ⇒ Boolean

Returns:

  • (Boolean)


247
248
249
# File 'lib/logstash/codecs/joinlines.rb', line 247

def doing_previous?(what)
  what != "next"
end

#encode(event) ⇒ Object



279
280
281
282
# File 'lib/logstash/codecs/joinlines.rb', line 279

def encode(event)
  # Nothing to do.
  @on_event.call(event, event)
end

#flush(&block) ⇒ Object



209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
# File 'lib/logstash/codecs/joinlines.rb', line 209

def flush(&block)
  if block_given? && @buffer.any?
    no_error = true
    events = merge_events
    begin
      yield events
    rescue ::Exception => e
      # need to rescue everything
      # likliest cause: backpressure or timeout by exception
      # can't really do anything but leave the data in the buffer for next time if there is one
      @logger.error("Joinlines: flush downstream error", :exception => e)
      no_error = false
    end
    reset_buffer if no_error
  end
end

#initialize_copy(source) ⇒ Object



296
297
298
299
# File 'lib/logstash/codecs/joinlines.rb', line 296

def initialize_copy(source)
  super
  register
end

#internal_decode(text, &block) ⇒ Object

private



159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/logstash/codecs/joinlines.rb', line 159

def internal_decode(text, &block)
  do_flush = false
  text = @converter.convert(text)
  text.split("\n").each do |line|
    matched = false
    zip_config.each do |pattern,what,negate,grok,handler|
      match = grok.match(line)
      @logger.debug("Joinlines", :pattern => pattern, :text => line,
                    :match => (match != false), :negate => negate)

      # Add negate option
      match = (match and !negate) || (!match and negate)

      if match
        do_flush = (what == "next" and @matching != "next")
        matched = true
        @matching = what
        break
      end
    end

    if !matched
      do_flush = (@matching != "next")
      @matching = ""
    end

    if do_flush
      flush do |event|
        yield(event,@matching)
      end
      do_flush = false
    end

    auto_flush_runner.start
    buffer(line)
  end
end

#merge_eventsObject



234
235
236
237
238
239
240
# File 'lib/logstash/codecs/joinlines.rb', line 234

def merge_events
  event = LogStash::Event.new(LogStash::Event::TIMESTAMP => @time, "message" => @buffer.join(NL))
  event.tag @multiline_tag if !@multiline_tag.empty? && @buffer.size > 1
  event.tag "joinlines_codec_max_bytes_reached" if over_maximum_bytes?
  event.tag "joinlines_codec_max_lines_reached" if over_maximum_lines?
  event
end

#over_maximum_bytes?Boolean

Returns:

  • (Boolean)


271
272
273
# File 'lib/logstash/codecs/joinlines.rb', line 271

def over_maximum_bytes?
  @buffer_bytes >= @max_bytes
end

#over_maximum_lines?Boolean

Returns:

  • (Boolean)


267
268
269
# File 'lib/logstash/codecs/joinlines.rb', line 267

def over_maximum_lines?
  @buffer.size > @max_lines
end

#registerObject



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/logstash/codecs/joinlines.rb', line 91

def register
  require "grok-pure" # rubygem 'jls-grok'
  require 'logstash/patterns/core'

  @matching = ""

  # Detect if we are running from a jarfile, pick the right path.
  patterns_path = []
  patterns_path += [LogStash::Patterns::Core.path]

  @patterns_dir = patterns_path.to_a + @patterns_dir
  @groks = []
  @handlers = []

  @patterns.zip(@what).each do |pattern,what|
    grok = Grok.new

    @patterns_dir.each do |path|
      if ::File.directory?(path)
        path = ::File.join(path, "*")
      end

      Dir.glob(path).each do |file|
        @logger.debug("Grok loading patterns from file", :path => file)
        grok.add_patterns_from_file(file)
      end
    end

    grok.compile(pattern)
    handler = method("do_#{what}".to_sym)

    @groks.push(grok)
    @handlers.push(handler)
  end

  @logger.trace("Registered joinlines plugin", :type => @type, :config => @config)
  reset_buffer

  @converter = LogStash::Util::Charset.new(@charset)
  @converter.logger = @logger

  if @auto_flush_interval
    # will start on first decode
    @auto_flush_runner = AutoFlush.new(self, @auto_flush_interval)
  end
end

#reset_bufferObject



242
243
244
245
# File 'lib/logstash/codecs/joinlines.rb', line 242

def reset_buffer
  @buffer = []
  @buffer_bytes = 0
end

#use_mapper_auto_flushObject

def register



138
139
140
141
142
# File 'lib/logstash/codecs/joinlines.rb', line 138

def use_mapper_auto_flush
  return unless auto_flush_active?
  @auto_flush_runner = AutoFlushUnset.new(nil, nil)
  @auto_flush_interval = @auto_flush_interval.to_f
end

#what_based_listener(what) ⇒ Object



251
252
253
# File 'lib/logstash/codecs/joinlines.rb', line 251

def what_based_listener(what)
  doing_previous?(what) ? @previous_listener : @last_seen_listener
end

#zip_configObject



154
155
156
# File 'lib/logstash/codecs/joinlines.rb', line 154

def zip_config
  @patterns.zip(@what, @negate, @groks, @handlers)
end