Class: LogStash::OutputDelegator

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/output_delegator.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(logger, klass, default_worker_count, *plugin_args) ⇒ OutputDelegator

The *args this takes are the same format that a Outputs::Base takes. A list of hashes with parameters in them Internally these just get merged together into a single hash



16
17
18
19
20
21
22
23
24
25
# File 'lib/logstash/output_delegator.rb', line 16

def initialize(logger, klass, default_worker_count, *plugin_args)
  @logger = logger
  @threadsafe = klass.threadsafe?
  @config = plugin_args.reduce({}, :merge)
  @klass = klass
  @workers = java.util.concurrent.CopyOnWriteArrayList.new
  @default_worker_count = default_worker_count
  @registered = false
  @events_received = Concurrent::AtomicFixnum.new(0)
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



12
13
14
# File 'lib/logstash/output_delegator.rb', line 12

def config
  @config
end

#threadsafeObject (readonly)

Returns the value of attribute threadsafe.



12
13
14
# File 'lib/logstash/output_delegator.rb', line 12

def threadsafe
  @threadsafe
end

#workersObject (readonly)

Returns the value of attribute workers.



12
13
14
# File 'lib/logstash/output_delegator.rb', line 12

def workers
  @workers
end

Instance Method Details

#busy_workersObject

There’s no concept of ‘busy’ workers for a threadsafe plugin!



154
155
156
157
158
159
160
161
162
163
# File 'lib/logstash/output_delegator.rb', line 154

def busy_workers
  if @threadsafe
    0
  else
    # The pipeline reporter can run before the outputs are registered trying to pull a value here
    # In that case @worker_queue is empty, we just return 0
    return 0 unless @worker_queue
    @workers.size - @worker_queue.size
  end
end

#config_nameObject



60
61
62
# File 'lib/logstash/output_delegator.rb', line 60

def config_name
  @klass.config_name
end

#do_closeObject



136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/logstash/output_delegator.rb', line 136

def do_close
  @logger.debug("closing output delegator", :klass => @klass)

  if @threadsafe
    @workers.each(&:do_close)
  else
    worker_count.times do
      worker = @worker_queue.pop
      worker.do_close
    end
  end
end

#events_receivedObject



149
150
151
# File 'lib/logstash/output_delegator.rb', line 149

def events_received
  @events_received.value
end

#registerObject

Raises:

  • (ArgumentError)


64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/logstash/output_delegator.rb', line 64

def register
  raise ArgumentError, "Attempted to register #{self} twice!" if @registered
  @registered = true
  # We define this as an array regardless of threadsafety
  # to make reporting simpler, even though a threadsafe plugin will just have
  # a single instance
  #
  # Older plugins invoke the instance method Outputs::Base#workers_not_supported
  # To detect these we need an instance to be created first :()
  # TODO: In the next major version after 2.x remove support for this
  @workers << @klass.new(@config)
  @workers.first.register # Needed in case register calls `workers_not_supported`

  @logger.debug("Will start workers for output", :worker_count => target_worker_count, :class => @klass)

  # Threadsafe versions don't need additional workers
  setup_additional_workers!(target_worker_count) unless @threadsafe
  # We skip the first worker because that's pre-registered to deal with legacy workers_not_supported
  @workers.subList(1,@workers.size).each(&:register)
  setup_multi_receive!
end

#setup_additional_workers!(target_worker_count) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/logstash/output_delegator.rb', line 86

def setup_additional_workers!(target_worker_count)
  warn_on_worker_override!

  (target_worker_count - 1).times do
    inst = @klass.new(@config)
    @workers << inst
  end

  # This queue is used to manage sharing across threads
  @worker_queue = SizedQueue.new(target_worker_count)
  @workers.each {|w| @worker_queue << w }
end

#setup_multi_receive!Object



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/logstash/output_delegator.rb', line 99

def setup_multi_receive!
  # One might wonder why we don't use something like
  # define_singleton_method(:multi_receive, method(:threadsafe_multi_receive)
  # and the answer is this is buggy on Jruby 1.7.x . It works 98% of the time!
  # The other 2% you get weird errors about rebinding to the same object
  # Until we switch to Jruby 9.x keep the define_singleton_method parts
  # the way they are, with a block
  # See https://github.com/jruby/jruby/issues/3582
  if threadsafe?
    @threadsafe_worker = @workers.first
    define_singleton_method(:multi_receive) do |events|
      threadsafe_multi_receive(events)
    end
  else
    define_singleton_method(:multi_receive) do |events|
      worker_multi_receive(events)
    end
  end
end

#target_worker_countObject

Raises:

  • (ArgumentError)


49
50
51
52
53
54
55
56
57
58
# File 'lib/logstash/output_delegator.rb', line 49

def target_worker_count
  # Remove in 5.0 after all plugins upgraded to use class level declarations
  raise ArgumentError, "Attempted to detect target worker count before instantiating a worker to test for legacy workers_not_supported!" if @workers.size == 0

  if @threadsafe || @klass.workers_not_supported?
    1
  else
    @config["workers"] || @default_worker_count
  end
end

#threadsafe?Boolean

Returns:

  • (Boolean)


27
28
29
# File 'lib/logstash/output_delegator.rb', line 27

def threadsafe?
  !!@threadsafe
end

#threadsafe_multi_receive(events) ⇒ Object



119
120
121
122
123
# File 'lib/logstash/output_delegator.rb', line 119

def threadsafe_multi_receive(events)
  @events_received.increment(events.length)

  @threadsafe_worker.multi_receive(events)
end

#warn_on_worker_override!Object



31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/logstash/output_delegator.rb', line 31

def warn_on_worker_override!
  # The user has configured extra workers, but this plugin doesn't support it :(
  if worker_limits_overriden?
    message = @klass.workers_not_supported_message
    warning_meta = {:plugin => @klass.config_name, :worker_count => @config["workers"]}
    if message
      warning_meta[:message] = message
      @logger.warn(I18n.t("logstash.pipeline.output-worker-unsupported-with-message", warning_meta))
    else
      @logger.warn(I18n.t("logstash.pipeline.output-worker-unsupported", warning_meta))
    end
  end
end

#worker_countObject



165
166
167
# File 'lib/logstash/output_delegator.rb', line 165

def worker_count
  @workers.size
end

#worker_limits_overriden?Boolean

Returns:

  • (Boolean)


45
46
47
# File 'lib/logstash/output_delegator.rb', line 45

def worker_limits_overriden?
  @config["workers"] && @config["workers"] > 1 && @klass.workers_not_supported?
end

#worker_multi_receive(events) ⇒ Object



125
126
127
128
129
130
131
132
133
134
# File 'lib/logstash/output_delegator.rb', line 125

def worker_multi_receive(events)
  @events_received.increment(events.length)

  worker = @worker_queue.pop
  begin
    worker.multi_receive(events)
  ensure
    @worker_queue.push(worker)
  end
end