Class: LogStash::Outputs::Base

Inherits:
Plugin
  • Object
show all
Includes:
Config::Mixin
Defined in:
lib/logstash/outputs/base.rb

Constant Summary

Constants included from Config::Mixin

Config::Mixin::CONFIGSORT, Config::Mixin::PLUGIN_VERSION_0_9_0, Config::Mixin::PLUGIN_VERSION_1_0_0

Constants inherited from Plugin

Plugin::NL

Instance Attribute Summary collapse

Attributes included from Config::Mixin

#config, #original_params

Attributes inherited from Plugin

#logger, #params

Instance Method Summary collapse

Methods included from Config::Mixin

#config_init, included

Methods inherited from Plugin

#eql?, #finished, #finished?, #hash, #inspect, lookup, #reload, #running?, #shutdown, #teardown, #terminating?, #to_s

Constructor Details

#initialize(params = {}) ⇒ Base

Returns a new instance of Base.



50
51
52
53
# File 'lib/logstash/outputs/base.rb', line 50

def initialize(params={})
  super
  config_init(params)
end

Instance Attribute Details

#worker_pluginsObject (readonly)

Returns the value of attribute worker_plugins.



36
37
38
# File 'lib/logstash/outputs/base.rb', line 36

def worker_plugins
  @worker_plugins
end

Instance Method Details

#handle(event) ⇒ Object



87
88
89
# File 'lib/logstash/outputs/base.rb', line 87

def handle(event)
  receive(event)
end

#handle_worker(event) ⇒ Object

def handle



91
92
93
# File 'lib/logstash/outputs/base.rb', line 91

def handle_worker(event)
  @worker_queue.push(event)
end

#receive(event) ⇒ Object



61
62
63
# File 'lib/logstash/outputs/base.rb', line 61

def receive(event)
  raise "#{self.class}#receive must be overidden"
end

#registerObject



56
57
58
# File 'lib/logstash/outputs/base.rb', line 56

def register
  raise "#{self.class}#register must be overidden"
end

#worker_setupObject



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/logstash/outputs/base.rb', line 66

def worker_setup
  if @workers == 1
    @worker_plugins = [self]
  else
    define_singleton_method(:handle, method(:handle_worker))
    @worker_queue = SizedQueue.new(20)
    @worker_plugins = @workers.times.map { self.class.new(@original_params.merge("workers" => 1)) }
    @worker_plugins.map.with_index do |plugin, i|
      Thread.new(original_params, @worker_queue) do |params, queue|
        LogStash::Util::set_thread_name(">#{self.class.config_name}.#{i}")
        plugin.register
        while true
          event = queue.pop
          plugin.handle(event)
        end
      end
    end
  end
end

#workers_not_supported(message = nil) ⇒ Object



39
40
41
42
43
44
45
46
47
# File 'lib/logstash/outputs/base.rb', line 39

def workers_not_supported(message=nil)
  return if @workers == 1
  if message
    @logger.warn(I18n.t("logstash.pipeline.output-worker-unsupported-with-message", :plugin => self.class.config_name, :worker_count => @workers, :message => message))
  else
    @logger.warn(I18n.t("logstash.pipeline.output-worker-unsupported", :plugin => self.class.config_name, :worker_count => @workers))
  end
  @workers = 1
end