Class: LogStash::OutputDelegator
- Inherits:
-
Object
- Object
- LogStash::OutputDelegator
- Defined in:
- lib/logstash/output_delegator.rb
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#threadsafe ⇒ Object
readonly
Returns the value of attribute threadsafe.
-
#workers ⇒ Object
readonly
Returns the value of attribute workers.
Instance Method Summary collapse
-
#busy_workers ⇒ Object
There’s no concept of ‘busy’ workers for a threadsafe plugin!.
- #config_name ⇒ Object
- #do_close ⇒ Object
- #events_received ⇒ Object
-
#initialize(logger, klass, default_worker_count, *plugin_args) ⇒ OutputDelegator
constructor
The *args this takes are the same format that a Outputs::Base takes.
- #register ⇒ Object
- #setup_additional_workers!(target_worker_count) ⇒ Object
- #setup_multi_receive! ⇒ Object
- #target_worker_count ⇒ Object
- #threadsafe? ⇒ Boolean
- #threadsafe_multi_receive(events) ⇒ Object
- #warn_on_worker_override! ⇒ Object
- #worker_count ⇒ Object
- #worker_limits_overriden? ⇒ Boolean
- #worker_multi_receive(events) ⇒ Object
Constructor Details
#initialize(logger, klass, default_worker_count, *plugin_args) ⇒ OutputDelegator
The *args this takes are the same format that a Outputs::Base takes. A list of hashes with parameters in them Internally these just get merged together into a single hash
16 17 18 19 20 21 22 23 24 25 |
# File 'lib/logstash/output_delegator.rb', line 16 def initialize(logger, klass, default_worker_count, *plugin_args) @logger = logger @threadsafe = klass.threadsafe? @config = plugin_args.reduce({}, :merge) @klass = klass @workers = java.util.concurrent.CopyOnWriteArrayList.new @default_worker_count = default_worker_count @registered = false @events_received = Concurrent::AtomicFixnum.new(0) end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
12 13 14 |
# File 'lib/logstash/output_delegator.rb', line 12 def config @config end |
#threadsafe ⇒ Object (readonly)
Returns the value of attribute threadsafe.
12 13 14 |
# File 'lib/logstash/output_delegator.rb', line 12 def threadsafe @threadsafe end |
#workers ⇒ Object (readonly)
Returns the value of attribute workers.
12 13 14 |
# File 'lib/logstash/output_delegator.rb', line 12 def workers @workers end |
Instance Method Details
#busy_workers ⇒ Object
There’s no concept of ‘busy’ workers for a threadsafe plugin!
154 155 156 157 158 159 160 161 162 163 |
# File 'lib/logstash/output_delegator.rb', line 154 def busy_workers if @threadsafe 0 else # The pipeline reporter can run before the outputs are registered trying to pull a value here # In that case @worker_queue is empty, we just return 0 return 0 unless @worker_queue @workers.size - @worker_queue.size end end |
#config_name ⇒ Object
60 61 62 |
# File 'lib/logstash/output_delegator.rb', line 60 def config_name @klass.config_name end |
#do_close ⇒ Object
136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/logstash/output_delegator.rb', line 136 def do_close @logger.debug("closing output delegator", :klass => @klass) if @threadsafe @workers.each(&:do_close) else worker_count.times do worker = @worker_queue.pop worker.do_close end end end |
#events_received ⇒ Object
149 150 151 |
# File 'lib/logstash/output_delegator.rb', line 149 def events_received @events_received.value end |
#register ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/logstash/output_delegator.rb', line 64 def register raise ArgumentError, "Attempted to register #{self} twice!" if @registered @registered = true # We define this as an array regardless of threadsafety # to make reporting simpler, even though a threadsafe plugin will just have # a single instance # # Older plugins invoke the instance method Outputs::Base#workers_not_supported # To detect these we need an instance to be created first :() # TODO: In the next major version after 2.x remove support for this @workers << @klass.new(@config) @workers.first.register # Needed in case register calls `workers_not_supported` @logger.debug("Will start workers for output", :worker_count => target_worker_count, :class => @klass) # Threadsafe versions don't need additional workers setup_additional_workers!(target_worker_count) unless @threadsafe # We skip the first worker because that's pre-registered to deal with legacy workers_not_supported @workers.subList(1,@workers.size).each(&:register) setup_multi_receive! end |
#setup_additional_workers!(target_worker_count) ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/logstash/output_delegator.rb', line 86 def setup_additional_workers!(target_worker_count) warn_on_worker_override! (target_worker_count - 1).times do inst = @klass.new(@config) @workers << inst end # This queue is used to manage sharing across threads @worker_queue = SizedQueue.new(target_worker_count) @workers.each {|w| @worker_queue << w } end |
#setup_multi_receive! ⇒ Object
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/logstash/output_delegator.rb', line 99 def setup_multi_receive! # One might wonder why we don't use something like # define_singleton_method(:multi_receive, method(:threadsafe_multi_receive) # and the answer is this is buggy on Jruby 1.7.x . It works 98% of the time! # The other 2% you get weird errors about rebinding to the same object # Until we switch to Jruby 9.x keep the define_singleton_method parts # the way they are, with a block # See https://github.com/jruby/jruby/issues/3582 if threadsafe? @threadsafe_worker = @workers.first define_singleton_method(:multi_receive) do |events| threadsafe_multi_receive(events) end else define_singleton_method(:multi_receive) do |events| worker_multi_receive(events) end end end |
#target_worker_count ⇒ Object
49 50 51 52 53 54 55 56 57 58 |
# File 'lib/logstash/output_delegator.rb', line 49 def target_worker_count # Remove in 5.0 after all plugins upgraded to use class level declarations raise ArgumentError, "Attempted to detect target worker count before instantiating a worker to test for legacy workers_not_supported!" if @workers.size == 0 if @threadsafe || @klass.workers_not_supported? 1 else @config["workers"] || @default_worker_count end end |
#threadsafe? ⇒ Boolean
27 28 29 |
# File 'lib/logstash/output_delegator.rb', line 27 def threadsafe? !!@threadsafe end |
#threadsafe_multi_receive(events) ⇒ Object
119 120 121 122 123 |
# File 'lib/logstash/output_delegator.rb', line 119 def threadsafe_multi_receive(events) @events_received.increment(events.length) @threadsafe_worker.multi_receive(events) end |
#warn_on_worker_override! ⇒ Object
31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/logstash/output_delegator.rb', line 31 def warn_on_worker_override! # The user has configured extra workers, but this plugin doesn't support it :( if worker_limits_overriden? = @klass. = {:plugin => @klass.config_name, :worker_count => @config["workers"]} if [:message] = @logger.warn(I18n.t("logstash.pipeline.output-worker-unsupported-with-message", )) else @logger.warn(I18n.t("logstash.pipeline.output-worker-unsupported", )) end end end |
#worker_count ⇒ Object
165 166 167 |
# File 'lib/logstash/output_delegator.rb', line 165 def worker_count @workers.size end |
#worker_limits_overriden? ⇒ Boolean
45 46 47 |
# File 'lib/logstash/output_delegator.rb', line 45 def worker_limits_overriden? @config["workers"] && @config["workers"] > 1 && @klass.workers_not_supported? end |
#worker_multi_receive(events) ⇒ Object
125 126 127 128 129 130 131 132 133 134 |
# File 'lib/logstash/output_delegator.rb', line 125 def worker_multi_receive(events) @events_received.increment(events.length) worker = @worker_queue.pop begin worker.multi_receive(events) ensure @worker_queue.push(worker) end end |