Class: LogStash::Pipeline

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/pipeline.rb

Instance Method Summary collapse

Constructor Details

#initialize(configstr) ⇒ Pipeline

Returns a new instance of Pipeline.



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/logstash/pipeline.rb', line 15

def initialize(configstr)
  @logger = Cabin::Channel.get(LogStash)
  grammar = LogStashConfigParser.new
  @config = grammar.parse(configstr)
  if @config.nil?
    raise LogStash::ConfigurationError, grammar.failure_reason
  end

  # This will compile the config to ruby and evaluate the resulting code.
  # The code will initialize all the plugins and define the
  # filter and output methods.
  code = @config.compile
  # The config code is hard to represent as a log message...
  # So just print it.
  @logger.debug? && @logger.debug("Compiled pipeline code:\n#{code}")
  begin
    eval(code)
  rescue => e
    raise
  end

  @input_to_filter = SizedQueue.new(20)

  # If no filters, pipe inputs directly to outputs
  if !filters?
    @filter_to_output = @input_to_filter
  else
    @filter_to_output = SizedQueue.new(20)
  end
  @settings = {
    "filter-workers" => 1,
  }

  @run_mutex = Mutex.new
  @ready = false
  @started = false
  @input_threads = []
end

Instance Method Details

#configure(setting, value) ⇒ Object



62
63
64
65
66
67
68
69
70
71
# File 'lib/logstash/pipeline.rb', line 62

def configure(setting, value)
  if setting == "filter-workers"
    # Abort if we have any filters that aren't threadsafe
    if value > 1 && @filters.any? { |f| !f.threadsafe? }
      plugins = @filters.select { |f| !f.threadsafe? }.collect { |f| f.class.config_name }
      raise LogStash::ConfigurationError, "Cannot use more than 1 filter worker because the following plugins don't work with more than one worker: #{plugins.join(", ")}"
    end
  end
  @settings[setting] = value
end

#filter(event, &block) ⇒ Object

for backward compatibility in devutils for the rspec helpers, this method is not used in the pipeline anymore.



299
300
301
302
# File 'lib/logstash/pipeline.rb', line 299

def filter(event, &block)
  # filter_func returns all filtered events, including cancelled ones
  filter_func(event).each { |e| block.call(e) }
end

#filters?Boolean

Returns:

  • (Boolean)


73
74
75
# File 'lib/logstash/pipeline.rb', line 73

def filters?
  return @filters.any?
end

#filterworkerObject

def inputworker



210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
# File 'lib/logstash/pipeline.rb', line 210

def filterworker
  LogStash::Util::set_thread_name("|worker")
  begin
    while true
      event = @input_to_filter.pop

      case event
      when LogStash::Event
        # filter_func returns all filtered events, including cancelled ones
        filter_func(event).each { |e| @filter_to_output.push(e) unless e.cancelled? }
      when LogStash::FlushEvent
        # handle filter flushing here so that non threadsafe filters (thus only running one filterworker)
        # don't have to deal with thread safety implementing the flush method
        flush_filters_to_output!
      when LogStash::ShutdownEvent
        # pass it down to any other filterworker and stop this worker
        @input_to_filter.push(event)
        break
      end
    end
  rescue => e
    @logger.error("Exception in filterworker", "exception" => e, "backtrace" => e.backtrace)
  end

  @filters.each(&:teardown)
end

#flush_filters(options = {}, &block) ⇒ Object

perform filters flush and yeild flushed event to the passed block

Parameters:

  • options (Hash) (defaults to: {})

Options Hash (options):

  • :final (Boolean)

    > true to signal a final shutdown flush



307
308
309
310
311
312
313
# File 'lib/logstash/pipeline.rb', line 307

def flush_filters(options = {}, &block)
  flushers = options[:final] ? @shutdown_flushers : @periodic_flushers

  flushers.each do |flusher|
    flusher.call(options, &block)
  end
end

#flush_filters_to_output!(options = {}) ⇒ Object

perform filters flush into the output queue

Parameters:

  • options (Hash) (defaults to: {})

Options Hash (options):

  • :final (Boolean)

    > true to signal a final shutdown flush



318
319
320
321
322
323
324
325
# File 'lib/logstash/pipeline.rb', line 318

def flush_filters_to_output!(options = {})
  flush_filters(options) do |event|
    unless event.cancelled?
      @logger.debug? and @logger.debug("Pushing flushed events", :event => event)
      @filter_to_output.push(event)
    end
  end
end

#inputworker(plugin) ⇒ Object



174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/logstash/pipeline.rb', line 174

def inputworker(plugin)
  LogStash::Util::set_thread_name("<#{plugin.class.config_name}")
  begin
    plugin.run(@input_to_filter)
  rescue LogStash::ShutdownSignal
    # ignore and quit
  rescue => e
    if @logger.debug?
      @logger.error(I18n.t("logstash.pipeline.worker-error-debug",
                           :plugin => plugin.inspect, :error => e.to_s,
                           :exception => e.class,
                           :stacktrace => e.backtrace.join("\n")))
    else
      @logger.error(I18n.t("logstash.pipeline.worker-error",
                           :plugin => plugin.inspect, :error => e))
    end
    puts e.backtrace if @logger.debug?
    # input teardown must be synchronized since is can be called concurrently by
    # the input worker thread and from the pipeline thread shutdown method.
    # this means that input teardown methods must support multiple calls.
    @run_mutex.synchronize{plugin.teardown}
    sleep 1
    retry
  end
ensure
  begin
    # input teardown must be synchronized since is can be called concurrently by
    # the input worker thread and from the pipeline thread shutdown method.
    # this means that input teardown methods must support multiple calls.
    @run_mutex.synchronize{plugin.teardown}
  rescue LogStash::ShutdownSignal
    # teardown could receive the ShutdownSignal, retry it
    retry
  end
end

#outputworkerObject

def filterworker



237
238
239
240
241
242
243
244
245
246
247
248
249
250
# File 'lib/logstash/pipeline.rb', line 237

def outputworker
  LogStash::Util::set_thread_name(">output")
  @outputs.each(&:worker_setup)

  while true
    event = @filter_to_output.pop
    break if event == LogStash::SHUTDOWN
    output_func(event)
  end # while true

  @outputs.each do |output|
    output.worker_plugins.each(&:teardown)
  end
end

#plugin(plugin_type, name, *args) ⇒ Object

def shutdown



291
292
293
294
295
# File 'lib/logstash/pipeline.rb', line 291

def plugin(plugin_type, name, *args)
  args << {} if args.empty?
  klass = LogStash::Plugin.lookup(plugin_type, name)
  return klass.new(*args)
end

#ready?Boolean

def initialize

Returns:

  • (Boolean)


54
55
56
# File 'lib/logstash/pipeline.rb', line 54

def ready?
  @run_mutex.synchronize{@ready}
end

#runObject



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/logstash/pipeline.rb', line 77

def run
  @run_mutex.synchronize{@started = true}

  # synchronize @input_threads between run and shutdown
  @run_mutex.synchronize{start_inputs}
  start_filters if filters?
  start_outputs

  @run_mutex.synchronize{@ready = true}

  @logger.info("Pipeline started")
  @logger.terminal("Logstash startup completed")

  wait_inputs

  if filters?
    shutdown_filters
    wait_filters
    flush_filters_to_output!(:final => true)
  end

  shutdown_outputs
  wait_outputs

  @logger.info("Pipeline shutdown complete.")
  @logger.terminal("Logstash shutdown completed")

  # exit code
  return 0
end

#shutdownObject

Shutdown this pipeline.

This method is intended to be called from another thread



255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
# File 'lib/logstash/pipeline.rb', line 255

def shutdown
  InflightEventsReporter.logger = @logger
  InflightEventsReporter.start(@input_to_filter, @filter_to_output, @outputs)
  @input_threads.each do |thread|
    # Interrupt all inputs
    @logger.info("Sending shutdown signal to input thread", :thread => thread)

    # synchronize both ShutdownSignal and teardown below. by synchronizing both
    # we will avoid potentially sending a shutdown signal when the inputworker is
    # executing the teardown method.
    @run_mutex.synchronize do
      thread.raise(LogStash::ShutdownSignal)
      begin
        thread.wakeup # in case it's in blocked IO or sleeping
      rescue ThreadError
      end
    end
  end

  # sometimes an input is stuck in a blocking I/O so we need to tell it to teardown directly
  @inputs.each do |input|
    begin
      # input teardown must be synchronized since is can be called concurrently by
      # the input worker thread and from the pipeline thread shutdown method.
      # this means that input teardown methods must support multiple calls.
      @run_mutex.synchronize{input.teardown}
    rescue LogStash::ShutdownSignal
      # teardown could receive the ShutdownSignal, retry it
      retry
    end
  end

  # No need to send the ShutdownEvent to the filters/outputs nor to wait for
  # the inputs to finish, because in the #run method we wait for that anyway.
end

#shutdown_filtersObject



118
119
120
121
# File 'lib/logstash/pipeline.rb', line 118

def shutdown_filters
  @flusher_thread.kill
  @input_to_filter.push(LogStash::SHUTDOWN)
end

#shutdown_outputsObject



127
128
129
130
# File 'lib/logstash/pipeline.rb', line 127

def shutdown_outputs
  # nothing, filters will do this
  @filter_to_output.push(LogStash::SHUTDOWN)
end

#start_filtersObject



154
155
156
157
158
159
160
161
# File 'lib/logstash/pipeline.rb', line 154

def start_filters
  @filters.each(&:register)
  @filter_threads = @settings["filter-workers"].times.collect do
    Thread.new { filterworker }
  end

  @flusher_thread = Thread.new { Stud.interval(5) { @input_to_filter.push(LogStash::FLUSH) } }
end

#start_input(plugin) ⇒ Object



170
171
172
# File 'lib/logstash/pipeline.rb', line 170

def start_input(plugin)
  @input_threads << Thread.new { inputworker(plugin) }
end

#start_inputsObject



137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/logstash/pipeline.rb', line 137

def start_inputs
  moreinputs = []
  @inputs.each do |input|
    if input.threadable && input.threads > 1
      (input.threads - 1).times do |i|
        moreinputs << input.clone
      end
    end
  end
  @inputs += moreinputs

  @inputs.each do |input|
    input.register
    start_input(input)
  end
end

#start_outputsObject



163
164
165
166
167
168
# File 'lib/logstash/pipeline.rb', line 163

def start_outputs
  @outputs.each(&:register)
  @output_threads = [
    Thread.new { outputworker }
  ]
end

#started?Boolean

Returns:

  • (Boolean)


58
59
60
# File 'lib/logstash/pipeline.rb', line 58

def started?
  @run_mutex.synchronize{@started}
end

#wait_filtersObject



123
124
125
# File 'lib/logstash/pipeline.rb', line 123

def wait_filters
  @filter_threads.each(&:join) if @filter_threads
end

#wait_inputsObject

def run



108
109
110
111
112
113
114
115
116
# File 'lib/logstash/pipeline.rb', line 108

def wait_inputs
  @input_threads.each(&:join)
rescue Interrupt
  # rbx does weird things during do SIGINT that I haven't debugged
  # so we catch Interrupt here and signal a shutdown. For some reason the
  # signal handler isn't invoked it seems? I dunno, haven't looked much into
  # it.
  shutdown
end

#wait_outputsObject



132
133
134
135
# File 'lib/logstash/pipeline.rb', line 132

def wait_outputs
  # Wait for the outputs to stop
  @output_threads.each(&:join)
end