Class: LogStash::Plugins::Builtin::Pipeline::Input

Inherits:
Inputs::Base
  • Object
show all
Defined in:
lib/logstash/plugins/builtin/pipeline/input.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#pipeline_busObject (readonly)

Returns the value of attribute pipeline_bus.



8
9
10
# File 'lib/logstash/plugins/builtin/pipeline/input.rb', line 8

def pipeline_bus
  @pipeline_bus
end

Instance Method Details

#internalReceive(events) ⇒ Object

Returns false if the receive failed due to a stopping input To understand why this value is useful see Internal.send_to Note, this takes a java Stream, not a ruby array



37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/logstash/plugins/builtin/pipeline/input.rb', line 37

def internalReceive(events)
  return false if !@running.get()

  # TODO This should probably push a batch at some point in the future when doing so
  # buys us some efficiency
  events.forEach do |event|
    decorate(event)
    @queue << event
  end

  true
end

#isRunningObject



57
58
59
# File 'lib/logstash/plugins/builtin/pipeline/input.rb', line 57

def isRunning
  @running.get
end

#registerObject



10
11
12
13
14
15
16
17
18
19
# File 'lib/logstash/plugins/builtin/pipeline/input.rb', line 10

def register
  # May as well set this up here, writers won't do anything until
  # @running is set to false
  @running = java.util.concurrent.atomic.AtomicBoolean.new(false)
  @pipeline_bus = execution_context.agent.pipeline_bus
  listen_successful = pipeline_bus.listen(self, address)
  if !listen_successful
    raise ::LogStash::ConfigurationError, "Internal input at '#{@address}' already bound! Addresses must be globally unique across pipelines."
  end
end

#run(queue) ⇒ Object



21
22
23
24
25
26
27
28
# File 'lib/logstash/plugins/builtin/pipeline/input.rb', line 21

def run(queue)
  @queue = queue
  @running.set(true)

  while @running.get()
    sleep 0.1
  end
end

#running?Boolean

Returns:

  • (Boolean)


30
31
32
# File 'lib/logstash/plugins/builtin/pipeline/input.rb', line 30

def running?
  @running && @running.get()
end

#stopObject



50
51
52
53
54
55
# File 'lib/logstash/plugins/builtin/pipeline/input.rb', line 50

def stop
  pipeline_bus.unlisten(self, address)
  # We stop receiving events _after_ we unlisten to pick up any events sent by upstream outputs that
  # have not yet stopped
  @running.set(false) if @running # If register wasn't yet called, no @running!
end