Class: LogStash::Outputs::Base
- Includes:
- Config::Mixin
- Defined in:
- lib/logstash/outputs/base.rb
Constant Summary
Constants included from Config::Mixin
Config::Mixin::PLUGIN_VERSION_0_9_0, Config::Mixin::PLUGIN_VERSION_1_0_0
Constants inherited from Plugin
Instance Attribute Summary collapse
-
#worker_plugins ⇒ Object
readonly
Returns the value of attribute worker_plugins.
-
#worker_queue ⇒ Object
readonly
Returns the value of attribute worker_queue.
-
#worker_threads ⇒ Object
readonly
Returns the value of attribute worker_threads.
Attributes included from Config::Mixin
Attributes inherited from Plugin
Instance Method Summary collapse
- #handle(event) ⇒ Object
-
#handle_worker(event) ⇒ Object
def handle.
-
#initialize(params = {}) ⇒ Base
constructor
A new instance of Base.
- #receive(event) ⇒ Object
- #register ⇒ Object
- #worker_setup ⇒ Object
- #workers_not_supported(message = nil) ⇒ Object
Methods included from Config::Mixin
Methods inherited from Plugin
#close, #debug_info, #do_close, #eql?, #hash, #inspect, lookup, #to_s
Constructor Details
#initialize(params = {}) ⇒ Base
Returns a new instance of Base.
40 41 42 43 |
# File 'lib/logstash/outputs/base.rb', line 40 def initialize(params={}) super config_init(params) end |
Instance Attribute Details
#worker_plugins ⇒ Object (readonly)
Returns the value of attribute worker_plugins.
26 27 28 |
# File 'lib/logstash/outputs/base.rb', line 26 def worker_plugins @worker_plugins end |
#worker_queue ⇒ Object (readonly)
Returns the value of attribute worker_queue.
26 27 28 |
# File 'lib/logstash/outputs/base.rb', line 26 def worker_queue @worker_queue end |
#worker_threads ⇒ Object (readonly)
Returns the value of attribute worker_threads.
26 27 28 |
# File 'lib/logstash/outputs/base.rb', line 26 def worker_threads @worker_threads end |
Instance Method Details
#handle(event) ⇒ Object
79 80 81 82 |
# File 'lib/logstash/outputs/base.rb', line 79 def handle(event) LogStash::Util.set_thread_plugin(self) receive(event) end |
#handle_worker(event) ⇒ Object
def handle
84 85 86 87 |
# File 'lib/logstash/outputs/base.rb', line 84 def handle_worker(event) LogStash::Util.set_thread_plugin(self) @worker_queue.push(event) end |
#receive(event) ⇒ Object
51 52 53 |
# File 'lib/logstash/outputs/base.rb', line 51 def receive(event) raise "#{self.class}#receive must be overidden" end |
#register ⇒ Object
46 47 48 |
# File 'lib/logstash/outputs/base.rb', line 46 def register raise "#{self.class}#register must be overidden" end |
#worker_setup ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/logstash/outputs/base.rb', line 56 def worker_setup if @workers == 1 @worker_plugins = [self] @worker_threads = [] else define_singleton_method(:handle, method(:handle_worker)) @worker_queue = SizedQueue.new(20) @worker_plugins = @workers.times.map { self.class.new(@original_params.merge("workers" => 1)) } @worker_threads = @worker_plugins.map.with_index do |plugin, i| Thread.new(original_params, @worker_queue) do |params, queue| LogStash::Util.set_thread_name(">#{self.class.config_name}.#{i}") LogStash::Util.set_thread_plugin(self) plugin.register while true event = queue.pop plugin.handle(event) end end end end end |
#workers_not_supported(message = nil) ⇒ Object
29 30 31 32 33 34 35 36 37 |
# File 'lib/logstash/outputs/base.rb', line 29 def workers_not_supported(=nil) return if @workers == 1 if @logger.warn(I18n.t("logstash.pipeline.output-worker-unsupported-with-message", :plugin => self.class.config_name, :worker_count => @workers, :message => )) else @logger.warn(I18n.t("logstash.pipeline.output-worker-unsupported", :plugin => self.class.config_name, :worker_count => @workers)) end @workers = 1 end |