Class: LogStash::Outputs::Base

Inherits:
Plugin
  • Object
show all
Includes:
Config::Mixin
Defined in:
lib/logstash/outputs/base.rb

Constant Summary

Constants included from Config::Mixin

Config::Mixin::PLUGIN_VERSION_0_9_0, Config::Mixin::PLUGIN_VERSION_1_0_0

Constants inherited from Plugin

Plugin::NL

Instance Attribute Summary collapse

Attributes included from Config::Mixin

#config, #original_params

Attributes inherited from Plugin

#logger, #params

Instance Method Summary collapse

Methods included from Config::Mixin

#config_init, included

Methods inherited from Plugin

#close, #debug_info, #do_close, #eql?, #hash, #inspect, lookup, #to_s

Constructor Details

#initialize(params = {}) ⇒ Base

Returns a new instance of Base.



40
41
42
43
# File 'lib/logstash/outputs/base.rb', line 40

def initialize(params={})
  super
  config_init(params)
end

Instance Attribute Details

#worker_pluginsObject (readonly)

Returns the value of attribute worker_plugins.



26
27
28
# File 'lib/logstash/outputs/base.rb', line 26

def worker_plugins
  @worker_plugins
end

#worker_queueObject (readonly)

Returns the value of attribute worker_queue.



26
27
28
# File 'lib/logstash/outputs/base.rb', line 26

def worker_queue
  @worker_queue
end

#worker_threadsObject (readonly)

Returns the value of attribute worker_threads.



26
27
28
# File 'lib/logstash/outputs/base.rb', line 26

def worker_threads
  @worker_threads
end

Instance Method Details

#handle(event) ⇒ Object



79
80
81
82
# File 'lib/logstash/outputs/base.rb', line 79

def handle(event)
  LogStash::Util.set_thread_plugin(self)
  receive(event)
end

#handle_worker(event) ⇒ Object

def handle



84
85
86
87
# File 'lib/logstash/outputs/base.rb', line 84

def handle_worker(event)
  LogStash::Util.set_thread_plugin(self)
  @worker_queue.push(event)
end

#receive(event) ⇒ Object



51
52
53
# File 'lib/logstash/outputs/base.rb', line 51

def receive(event)
  raise "#{self.class}#receive must be overidden"
end

#registerObject



46
47
48
# File 'lib/logstash/outputs/base.rb', line 46

def register
  raise "#{self.class}#register must be overidden"
end

#worker_setupObject



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/logstash/outputs/base.rb', line 56

def worker_setup
  if @workers == 1
    @worker_plugins = [self]
    @worker_threads = []
  else
    define_singleton_method(:handle, method(:handle_worker))
    @worker_queue = SizedQueue.new(20)
    @worker_plugins = @workers.times.map { self.class.new(@original_params.merge("workers" => 1)) }
    @worker_threads = @worker_plugins.map.with_index do |plugin, i|
      Thread.new(original_params, @worker_queue) do |params, queue|
        LogStash::Util.set_thread_name(">#{self.class.config_name}.#{i}")
        LogStash::Util.set_thread_plugin(self)
        plugin.register
        while true
          event = queue.pop
          plugin.handle(event)
        end
      end
    end
  end
end

#workers_not_supported(message = nil) ⇒ Object



29
30
31
32
33
34
35
36
37
# File 'lib/logstash/outputs/base.rb', line 29

def workers_not_supported(message=nil)
  return if @workers == 1
  if message
    @logger.warn(I18n.t("logstash.pipeline.output-worker-unsupported-with-message", :plugin => self.class.config_name, :worker_count => @workers, :message => message))
  else
    @logger.warn(I18n.t("logstash.pipeline.output-worker-unsupported", :plugin => self.class.config_name, :worker_count => @workers))
  end
  @workers = 1
end