Class: LogStash::Plugins::Builtin::Pipeline::Input

Inherits:
Inputs::Base show all
Defined in:
lib/logstash/plugins/builtin/pipeline/input.rb

Constant Summary

Constants included from Config::Mixin

Config::Mixin::PLUGIN_VERSION_0_9_0, Config::Mixin::PLUGIN_VERSION_1_0_0

Constants included from Util::SubstitutionVariables

Util::SubstitutionVariables::SUBSTITUTION_PLACEHOLDER_REGEX

Constants inherited from LogStash::Plugin

LogStash::Plugin::NL

Instance Attribute Summary collapse

Attributes inherited from Inputs::Base

#params, #threadable

Attributes included from Config::Mixin

#config, #original_params

Attributes inherited from LogStash::Plugin

#execution_context, #params

Instance Method Summary collapse

Methods inherited from Inputs::Base

#clone, #do_stop, #execution_context=, #initialize, plugin_type, #stop?, #tag

Methods included from Config::Mixin

#config_init, included

Methods included from Util::SubstitutionVariables

#deep_replace, #replace_placeholders

Methods included from Util::Loggable

included, #logger, #slow_logger

Methods inherited from LogStash::Plugin

#close, #config_name, #debug_info, #do_close, #eql?, #hash, #id, #initialize, #inspect, lookup, #metric, #metric=, #reloadable?, reloadable?, #to_s

Constructor Details

This class inherits a constructor from LogStash::Inputs::Base

Instance Attribute Details

#pipeline_busObject (readonly)

Returns the value of attribute pipeline_bus.



8
9
10
# File 'lib/logstash/plugins/builtin/pipeline/input.rb', line 8

def pipeline_bus
  @pipeline_bus
end

Instance Method Details

#internalReceive(events) ⇒ Object

Returns false if the receive failed due to a stopping input To understand why this value is useful see Internal.send_to Note, this takes a java Stream, not a ruby array



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/logstash/plugins/builtin/pipeline/input.rb', line 37

def internalReceive(events)
  return false if !@running.get()

  # TODO This should probably push a batch at some point in the future when doing so
  # buys us some efficiency
  events.forEach do |event|
    decorate(event)
    @queue << event
  end

  return true
rescue => e
  require 'pry'; binding.pry
  return true
end

#isRunningObject



59
60
61
# File 'lib/logstash/plugins/builtin/pipeline/input.rb', line 59

def isRunning
  @running.get
end

#registerObject



10
11
12
13
14
15
16
17
18
19
# File 'lib/logstash/plugins/builtin/pipeline/input.rb', line 10

def register
  # May as well set this up here, writers won't do anything until
  # @running is set to false
  @running = java.util.concurrent.atomic.AtomicBoolean.new(false)
  @pipeline_bus = execution_context.agent.pipeline_bus
  listen_successful = pipeline_bus.listen(self, address)
  if !listen_successful
    raise ::LogStash::ConfigurationError, "Internal input at '#{@address}' already bound! Addresses must be globally unique across pipelines."
  end
end

#run(queue) ⇒ Object



21
22
23
24
25
26
27
28
# File 'lib/logstash/plugins/builtin/pipeline/input.rb', line 21

def run(queue)
  @queue = queue
  @running.set(true)

  while @running.get()
    sleep 0.1
  end
end

#running?Boolean

Returns:

  • (Boolean)


30
31
32
# File 'lib/logstash/plugins/builtin/pipeline/input.rb', line 30

def running?
  @running && @running.get()
end

#stopObject



53
54
55
56
57
# File 'lib/logstash/plugins/builtin/pipeline/input.rb', line 53

def stop
  # We stop receiving events before we unlisten to prevent races
  @running.set(false) if @running # If register wasn't yet called, no @running!
  pipeline_bus.unlisten(self, address)
end