Class: LogStash::PipelineAction::Create

Inherits:
Base
  • Object
show all
Includes:
Util::Loggable
Defined in:
lib/logstash/pipeline_action/create.rb

Instance Method Summary collapse

Methods inherited from Base

#<=>, #inspect

Constructor Details

#initialize(pipeline_config, metric) ⇒ Create

We currently pass around the metric object again this is needed to correctly create a pipeline, in a future PR we could pass a factory to create the pipeline so we pass the logic to create the pipeline instead.



14
15
16
17
# File 'lib/logstash/pipeline_action/create.rb', line 14

def initialize(pipeline_config, metric)
  @pipeline_config = pipeline_config
  @metric = metric
end

Instance Method Details

#execute(agent, pipelines) ⇒ Object

The execute assume that the thread safety access of the pipeline is managed by the caller.



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/logstash/pipeline_action/create.rb', line 33

def execute(agent, pipelines)
  pipeline =
    if @pipeline_config.settings.get_value("pipeline.java_execution")
      LogStash::JavaPipeline.new(@pipeline_config, @metric, agent)
    else
      agent.exclusive do
        # The Ruby pipeline initialization is not thread safe because of the module level
        # shared state in LogsStash::Config::AST. When using multiple pipelines this gets
        # executed simultaneously in different threads and we need to synchonize this initialization.
        LogStash::Pipeline.new(@pipeline_config, @metric, agent)
      end
    end

  status = nil
  pipelines.compute(pipeline_id) do |id,value|
    if value
      LogStash::ConvergeResult::ActionResult.create(self, true)
    end
    status = pipeline.start # block until the pipeline is correctly started or crashed
    pipeline # The pipeline is successfully started we can add it to the map
  end


  LogStash::ConvergeResult::ActionResult.create(self, status)
end

#execution_priorityObject

Make sure we execution system pipeline like the monitoring before any user defined pipelines, system pipeline register hooks into the system that will be triggered by the user defined pipeline.



26
27
28
29
# File 'lib/logstash/pipeline_action/create.rb', line 26

def execution_priority
  default_priority = super
  @pipeline_config.system? ? default_priority * -1 : default_priority
end

#pipeline_idObject



19
20
21
# File 'lib/logstash/pipeline_action/create.rb', line 19

def pipeline_id
  @pipeline_config.pipeline_id
end

#to_sObject



60
61
62
# File 'lib/logstash/pipeline_action/create.rb', line 60

def to_s
  "PipelineAction::Create<#{pipeline_id}>"
end