Class: LogStash::Pipeline

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/pipeline.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(configstr) ⇒ Pipeline

Returns a new instance of Pipeline.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/logstash/pipeline.rb', line 19

def initialize(configstr)
  @logger = Cabin::Channel.get(LogStash)

  @inputs = nil
  @filters = nil
  @outputs = nil

  grammar = LogStashConfigParser.new
  @config = grammar.parse(configstr)

  if @config.nil?
    raise LogStash::ConfigurationError, grammar.failure_reason
  end
  # This will compile the config to ruby and evaluate the resulting code.
  # The code will initialize all the plugins and define the
  # filter and output methods.
  code = @config.compile
  # The config code is hard to represent as a log message...
  # So just print it.
  @logger.debug? && @logger.debug("Compiled pipeline code:\n#{code}")
  begin
    eval(code)
  rescue => e
    raise
  end

  @input_to_filter = SizedQueue.new(20)
  # if no filters, pipe inputs directly to outputs
  @filter_to_output = filters? ? SizedQueue.new(20) : @input_to_filter

  @settings = {
    "default-filter-workers" => LogStash::Config::CpuCoreStrategy.fifty_percent
  }

  # @ready requires thread safety since it is typically polled from outside the pipeline thread
  @ready = Concurrent::AtomicBoolean.new(false)
  @input_threads = []
  @filter_threads = []
end

Instance Attribute Details

#filter_to_outputObject (readonly)

Returns the value of attribute filter_to_output.



17
18
19
# File 'lib/logstash/pipeline.rb', line 17

def filter_to_output
  @filter_to_output
end

#filtersObject (readonly)

Returns the value of attribute filters.



17
18
19
# File 'lib/logstash/pipeline.rb', line 17

def filters
  @filters
end

#input_to_filterObject (readonly)

Returns the value of attribute input_to_filter.



17
18
19
# File 'lib/logstash/pipeline.rb', line 17

def input_to_filter
  @input_to_filter
end

#inputsObject (readonly)

Returns the value of attribute inputs.



17
18
19
# File 'lib/logstash/pipeline.rb', line 17

def inputs
  @inputs
end

#outputsObject (readonly)

Returns the value of attribute outputs.



17
18
19
# File 'lib/logstash/pipeline.rb', line 17

def outputs
  @outputs
end

Instance Method Details

#configure(setting, value) ⇒ Object



63
64
65
# File 'lib/logstash/pipeline.rb', line 63

def configure(setting, value)
  @settings[setting] = value
end

#filter(event, &block) ⇒ Object

for backward compatibility in devutils for the rspec helpers, this method is not used in the pipeline anymore.



310
311
312
313
# File 'lib/logstash/pipeline.rb', line 310

def filter(event, &block)
  # filter_func returns all filtered events, including cancelled ones
  filter_func(event).each { |e| block.call(e) }
end

#filters?Boolean

Returns:

  • (Boolean)


92
93
94
# File 'lib/logstash/pipeline.rb', line 92

def filters?
  return @filters.any?
end

#filterworkerObject

def inputworker



236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
# File 'lib/logstash/pipeline.rb', line 236

def filterworker
  begin
    while true
      event = @input_to_filter.pop

      case event
      when LogStash::Event
        # filter_func returns all filtered events, including cancelled ones
        filter_func(event).each { |e| @filter_to_output.push(e) unless e.cancelled? }
      when LogStash::FlushEvent
        # handle filter flushing here so that non threadsafe filters (thus only running one filterworker)
        # don't have to deal with thread safety implementing the flush method
        flush_filters_to_output!
      when LogStash::ShutdownEvent
        # pass it down to any other filterworker and stop this worker
        @input_to_filter.push(event)
        break
      end
    end
  rescue Exception => e
    # Plugins authors should manage their own exceptions in the plugin code
    # but if an exception is raised up to the worker thread they are considered
    # fatal and logstash will not recover from this situation.
    #
    # Users need to check their configuration or see if there is a bug in the
    # plugin.
    @logger.error("Exception in filterworker, the pipeline stopped processing new events, please check your filter configuration and restart Logstash.",
                  "exception" => e, "backtrace" => e.backtrace)
    raise
  ensure
    @filters.each(&:do_close)
  end
end

#flush_filters(options = {}, &block) ⇒ Object

perform filters flush and yeild flushed event to the passed block

Parameters:

  • options (Hash) (defaults to: {})

Options Hash (options):

  • :final (Boolean)

    > true to signal a final shutdown flush



318
319
320
321
322
323
324
# File 'lib/logstash/pipeline.rb', line 318

def flush_filters(options = {}, &block)
  flushers = options[:final] ? @shutdown_flushers : @periodic_flushers

  flushers.each do |flusher|
    flusher.call(options, &block)
  end
end

#flush_filters_to_output!(options = {}) ⇒ Object

perform filters flush into the output queue

Parameters:

  • options (Hash) (defaults to: {})

Options Hash (options):

  • :final (Boolean)

    > true to signal a final shutdown flush



329
330
331
332
333
334
335
336
# File 'lib/logstash/pipeline.rb', line 329

def flush_filters_to_output!(options = {})
  flush_filters(options) do |event|
    unless event.cancelled?
      @logger.debug? and @logger.debug("Pushing flushed events", :event => event)
      @filter_to_output.push(event)
    end
  end
end

#inflight_countObject

flush_filters_to_output!



338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
# File 'lib/logstash/pipeline.rb', line 338

def inflight_count
  data = {}
  total = 0

  input_to_filter = @input_to_filter.size
  total += input_to_filter
  filter_to_output = @filter_to_output.size
  total += filter_to_output

  data["input_to_filter"] = input_to_filter if input_to_filter > 0
  data["filter_to_output"] = filter_to_output if filter_to_output > 0

  output_worker_queues = []
  @outputs.each do |output|
    next unless output.worker_queue && output.worker_queue.size > 0
    plugin_info = output.debug_info
    size = output.worker_queue.size
    total += size
    plugin_info << size
    output_worker_queues << plugin_info
  end
  data["output_worker_queues"] = output_worker_queues unless output_worker_queues.empty?
  data["total"] = total
  data
end

#inputworker(plugin) ⇒ Object



202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# File 'lib/logstash/pipeline.rb', line 202

def inputworker(plugin)
  LogStash::Util.set_thread_name("<#{plugin.class.config_name}")
  LogStash::Util.set_thread_plugin(plugin)
  begin
    plugin.run(@input_to_filter)
  rescue => e
    # if plugin is stopping, ignore uncatched exceptions and exit worker
    if plugin.stop?
      @logger.debug("Input plugin raised exception during shutdown, ignoring it.",
                    :plugin => plugin.class.config_name, :exception => e,
                    :backtrace => e.backtrace)
      return
    end

    # otherwise, report error and restart
    if @logger.debug?
      @logger.error(I18n.t("logstash.pipeline.worker-error-debug",
                           :plugin => plugin.inspect, :error => e.to_s,
                           :exception => e.class,
                           :stacktrace => e.backtrace.join("\n")))
    else
      @logger.error(I18n.t("logstash.pipeline.worker-error",
                           :plugin => plugin.inspect, :error => e))
    end

    # Assuming the failure that caused this exception is transient,
    # let's sleep for a bit and execute #run again
    sleep(1)
    retry
  ensure
    plugin.do_close
  end
end

#outputworkerObject

def filterworker



270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
# File 'lib/logstash/pipeline.rb', line 270

def outputworker
  LogStash::Util.set_thread_name(">output")
  @outputs.each(&:worker_setup)

  while true
    event = @filter_to_output.pop
    break if event == LogStash::SHUTDOWN
    output_func(event)
    LogStash::Util.set_thread_plugin(nil)
  end
ensure
  @outputs.each do |output|
    output.worker_plugins.each(&:do_close)
  end
end

#plugin(plugin_type, name, *args) ⇒ Object

def shutdown



302
303
304
305
306
# File 'lib/logstash/pipeline.rb', line 302

def plugin(plugin_type, name, *args)
  args << {} if args.empty?
  klass = LogStash::Plugin.lookup(plugin_type, name)
  return klass.new(*args)
end

#plugin_threadsObject



372
373
374
375
376
377
378
# File 'lib/logstash/pipeline.rb', line 372

def plugin_threads
  input_threads = @input_threads.select {|t| t.alive? }.map {|t| thread_info(t) }
  filter_threads = @filter_threads.select {|t| t.alive? }.map {|t| thread_info(t) }
  output_threads = @output_threads.select {|t| t.alive? }.map {|t| thread_info(t) }
  output_worker_threads = @outputs.flat_map {|output| output.worker_threads }.map {|t| thread_info(t) }
  input_threads + filter_threads + output_threads + output_worker_threads
end

#ready?Boolean

def initialize

Returns:

  • (Boolean)


59
60
61
# File 'lib/logstash/pipeline.rb', line 59

def ready?
  @ready.value
end

#runObject



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/logstash/pipeline.rb', line 96

def run
  @logger.terminal(LogStash::Util::DefaultsPrinter.print(@settings))

  begin
    start_inputs
    start_filters if filters?
    start_outputs
  ensure
    # it is important to garantee @ready to be true after the startup sequence has been completed
    # to potentially unblock the shutdown method which may be waiting on @ready to proceed
    @ready.make_true
  end

  @logger.info("Pipeline started")
  @logger.terminal("Logstash startup completed")

  wait_inputs

  if filters?
    shutdown_filters
    wait_filters
    flush_filters_to_output!(:final => true)
  end

  shutdown_outputs
  wait_outputs

  @logger.info("Pipeline shutdown complete.")
  @logger.terminal("Logstash shutdown completed")

  # exit code
  return 0
end

#safe_filter_worker_countObject



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/logstash/pipeline.rb', line 67

def safe_filter_worker_count
  default = @settings["default-filter-workers"]
  thread_count = @settings["filter-workers"] #override from args "-w 8" or config
  safe_filters, unsafe_filters = @filters.partition(&:threadsafe?)
  if unsafe_filters.any?
    plugins = unsafe_filters.collect { |f| f.class.config_name }
    case thread_count
    when nil
      # user did not specify a worker thread count
      # warn if the default is multiple
      @logger.warn("Defaulting filter worker threads to 1 because there are some filters that might not work with multiple worker threads",
        :count_was => default, :filters => plugins) if default > 1
      1 # can't allow the default value to propagate if there are unsafe filters
    when 0, 1
      1
    else
      @logger.warn("Warning: Manual override - there are filters that might not work with multiple worker threads",
        :worker_threads => thread_count, :filters => plugins)
      thread_count # allow user to force this even if there are unsafe filters
    end
  else
    thread_count || default
  end
end

#shutdown(&before_stop) ⇒ Object

initiate the pipeline shutdown sequence this method is intended to be called from outside the pipeline thread

Parameters:

  • before_stop (Proc)

    code block called before performing stop operation on input plugins



289
290
291
292
293
294
295
296
297
298
299
300
# File 'lib/logstash/pipeline.rb', line 289

def shutdown(&before_stop)
  # shutdown can only start once the pipeline has completed its startup.
  # avoid potential race conditoon between the startup sequence and this
  # shutdown method which can be called from another thread at any time
  sleep(0.1) while !ready?

  # TODO: should we also check against calling shutdown multiple times concurently?

  before_stop.call if block_given?

  @inputs.each(&:do_stop)
end

#shutdown_filtersObject



134
135
136
137
# File 'lib/logstash/pipeline.rb', line 134

def shutdown_filters
  @flusher_thread.kill
  @input_to_filter.push(LogStash::SHUTDOWN)
end

#shutdown_outputsObject



143
144
145
146
# File 'lib/logstash/pipeline.rb', line 143

def shutdown_outputs
  # nothing, filters will do this
  @filter_to_output.push(LogStash::SHUTDOWN)
end

#stalling_threadsObject



364
365
366
367
368
369
370
# File 'lib/logstash/pipeline.rb', line 364

def stalling_threads
  plugin_threads
   .reject {|t| t["blocked_on"] } # known begnin blocking statuses
   .each {|t| t.delete("backtrace") }
   .each {|t| t.delete("blocked_on") }
   .each {|t| t.delete("status") }
end

#start_filtersObject



170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/logstash/pipeline.rb', line 170

def start_filters
  @filters.each(&:register)
  # dynamically get thread count based on filter threadsafety
  # moved this test to here to allow for future config reloading
  to_start = safe_filter_worker_count
  @filter_threads = to_start.times.collect do |i|
    Thread.new do
      LogStash::Util.set_thread_name("|filterworker.#{i}")
      filterworker
    end
  end
  actually_started = @filter_threads.select(&:alive?).size
  msg = "Worker threads expected: #{to_start}, worker threads started: #{actually_started}"
  if actually_started < to_start
    @logger.warn(msg)
  else
    @logger.info(msg)
  end
  @flusher_thread = Thread.new { Stud.interval(5) { @input_to_filter.push(LogStash::FLUSH) } }
end

#start_input(plugin) ⇒ Object



198
199
200
# File 'lib/logstash/pipeline.rb', line 198

def start_input(plugin)
  @input_threads << Thread.new { inputworker(plugin) }
end

#start_inputsObject



153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/logstash/pipeline.rb', line 153

def start_inputs
  moreinputs = []
  @inputs.each do |input|
    if input.threadable && input.threads > 1
      (input.threads - 1).times do |i|
        moreinputs << input.clone
      end
    end
  end
  @inputs += moreinputs

  @inputs.each do |input|
    input.register
    start_input(input)
  end
end

#start_outputsObject



191
192
193
194
195
196
# File 'lib/logstash/pipeline.rb', line 191

def start_outputs
  @outputs.each(&:register)
  @output_threads = [
    Thread.new { outputworker }
  ]
end

#thread_info(thread) ⇒ Object



380
381
382
# File 'lib/logstash/pipeline.rb', line 380

def thread_info(thread)
  LogStash::Util.thread_info(thread)
end

#wait_filtersObject



139
140
141
# File 'lib/logstash/pipeline.rb', line 139

def wait_filters
  @filter_threads.each(&:join)
end

#wait_inputsObject

def run



130
131
132
# File 'lib/logstash/pipeline.rb', line 130

def wait_inputs
  @input_threads.each(&:join)
end

#wait_outputsObject



148
149
150
151
# File 'lib/logstash/pipeline.rb', line 148

def wait_outputs
  # Wait for the outputs to stop
  @output_threads.each(&:join)
end