Class: LogStash::Plugins::Builtin::Pipeline::Input

Inherits:
Inputs::Base
  • Object
show all
Defined in:
lib/logstash/plugins/builtin/pipeline/input.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#pipeline_busObject (readonly)

Returns the value of attribute pipeline_bus.



8
9
10
# File 'lib/logstash/plugins/builtin/pipeline/input.rb', line 8

def pipeline_bus
  @pipeline_bus
end

Instance Method Details

#internalReceive(events) ⇒ Object

Returns false if the receive failed due to a stopping input To understand why this value is useful see Internal.send_to Note, this takes a java Stream, not a ruby array



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/logstash/plugins/builtin/pipeline/input.rb', line 37

def internalReceive(events)
  return false if !@running.get()

  # TODO This should probably push a batch at some point in the future when doing so
  # buys us some efficiency
  events.forEach do |event|
    decorate(event)
    @queue << event
  end

  return true
rescue => e
  require 'pry'; binding.pry
  return true
end

#isRunningObject



59
60
61
# File 'lib/logstash/plugins/builtin/pipeline/input.rb', line 59

def isRunning
  @running.get
end

#registerObject



10
11
12
13
14
15
16
17
18
19
# File 'lib/logstash/plugins/builtin/pipeline/input.rb', line 10

def register
  # May as well set this up here, writers won't do anything until
  # @running is set to false
  @running = java.util.concurrent.atomic.AtomicBoolean.new(false)
  @pipeline_bus = execution_context.agent.pipeline_bus
  listen_successful = pipeline_bus.listen(self, address)
  if !listen_successful
    raise ::LogStash::ConfigurationError, "Internal input at '#{@address}' already bound! Addresses must be globally unique across pipelines."
  end
end

#run(queue) ⇒ Object



21
22
23
24
25
26
27
28
# File 'lib/logstash/plugins/builtin/pipeline/input.rb', line 21

def run(queue)
  @queue = queue
  @running.set(true)

  while @running.get()
    sleep 0.1
  end
end

#running?Boolean

Returns:

  • (Boolean)


30
31
32
# File 'lib/logstash/plugins/builtin/pipeline/input.rb', line 30

def running?
  @running && @running.get()
end

#stopObject



53
54
55
56
57
# File 'lib/logstash/plugins/builtin/pipeline/input.rb', line 53

def stop
  # We stop receiving events before we unlisten to prevent races
  @running.set(false) if @running # If register wasn't yet called, no @running!
  pipeline_bus.unlisten(self, address)
end