Class: LogStash::PipelinesRegistry

Inherits:
Object
  • Object
show all
Includes:
Util::Loggable
Defined in:
lib/logstash/pipelines_registry.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializePipelinesRegistry

Returns a new instance of PipelinesRegistry.



32
33
34
35
36
37
38
# File 'lib/logstash/pipelines_registry.rb', line 32

def initialize
  # we leverage the semantic of the Java ConcurrentHashMap for the
  # compute() method which is atomic; calling compute() concurrently
  # will block until the other compute finishes so no mutex is necessary
  # for synchronizing compute calls
  @states = java.util.concurrent.ConcurrentHashMap.new
end

Instance Attribute Details

#statesObject (readonly)

Returns the value of attribute states.



29
30
31
# File 'lib/logstash/pipelines_registry.rb', line 29

def states
  @states
end

Instance Method Details

#create_pipeline(pipeline_id, pipeline, &create_block) ⇒ Boolean

Execute the passed creation logic block and create a new state upon success

Parameters:

  • pipeline_id (String, Symbol)

    the pipeline id

  • pipeline (Pipeline)

    the new pipeline to create

  • create_block (Block)

    the creation execution logic

Yield Returns:

  • (Boolean)

    the new pipeline creation success

Returns:

  • (Boolean)

    new pipeline creation success



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/logstash/pipelines_registry.rb', line 48

def create_pipeline(pipeline_id, pipeline, &create_block)
  success = false

  @states.compute(pipeline_id) do |_, state|
    if state
      if state.terminated?
        success = yield
        state.set_pipeline(pipeline)
      else
        logger.error("Attempted to create a pipeline that already exists", :pipeline_id => pipeline_id)
      end
      state
    else
      success = yield
      success ? PipelineState.new(pipeline_id, pipeline) : nil
    end
  end

  success
end

#empty?Boolean

Returns true if the states collection is empty.

Returns:

  • (Boolean)

    true if the states collection is empty.



128
129
130
# File 'lib/logstash/pipelines_registry.rb', line 128

def empty?
  @states.isEmpty
end

#get_pipeline(pipeline_id) ⇒ Pipeline

Returns the pipeline object or nil if none for pipeline_id.

Parameters:

  • pipeline_id (String, Symbol)

    the pipeline id

Returns:

  • (Pipeline)

    the pipeline object or nil if none for pipeline_id



117
118
119
120
# File 'lib/logstash/pipelines_registry.rb', line 117

def get_pipeline(pipeline_id)
  state = @states.get(pipeline_id)
  state.nil? ? nil : state.pipeline
end

#non_running_pipelinesHash{String=>Pipeline}

Returns:



138
139
140
# File 'lib/logstash/pipelines_registry.rb', line 138

def non_running_pipelines
  select_pipelines { |state| state.terminated? }
end

#reload_pipeline(pipeline_id, &reload_block) ⇒ Boolean

Execute the passed reloading logic block in the context of the reloading state and set new pipeline in state

Parameters:

  • pipeline_id (String, Symbol)

    the pipeline id

  • reload_block (Block)

    the reloading execution logic

Yield Returns:

  • (Array<Boolean, Pipeline>)

    the new pipeline creation success and new pipeline object

Returns:

  • (Boolean)

    new pipeline creation success



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/logstash/pipelines_registry.rb', line 93

def reload_pipeline(pipeline_id, &reload_block)
  success = false

  @states.compute(pipeline_id) do |_, state|
    if state.nil?
      logger.error("Attempted to reload a pipeline that does not exists", :pipeline_id => pipeline_id)
      nil
    else
      state.set_reloading(true)
      begin
        success, new_pipeline = yield
        state.set_pipeline(new_pipeline)
      ensure
        state.set_reloading(false)
      end
      state
    end
  end

  success
end

#running_pipelinesHash{String=>Pipeline}

Returns:



133
134
135
# File 'lib/logstash/pipelines_registry.rb', line 133

def running_pipelines
  select_pipelines { |state| !state.terminated? }
end

#running_user_defined_pipelinesHash{String=>Pipeline}

Returns:



143
144
145
# File 'lib/logstash/pipelines_registry.rb', line 143

def running_user_defined_pipelines
  select_pipelines { |state | !state.terminated? && !state.pipeline.system? }
end

#sizeFixnum

Returns number of items in the states collection.

Returns:

  • (Fixnum)

    number of items in the states collection



123
124
125
# File 'lib/logstash/pipelines_registry.rb', line 123

def size
  @states.size
end

#terminate_pipeline(pipeline_id, &stop_block) {|the| ... } ⇒ Object

Execute the passed termination logic block

Parameters:

  • pipeline_id (String, Symbol)

    the pipeline id

  • stop_block (Block)

    the termination execution logic

Yield Parameters:

  • the (Pipeline)

    pipeline to terminate



74
75
76
77
78
79
80
81
82
83
84
# File 'lib/logstash/pipelines_registry.rb', line 74

def terminate_pipeline(pipeline_id, &stop_block)
  @states.compute(pipeline_id) do |_, state|
    if state.nil?
      logger.error("Attempted to terminate a pipeline that does not exists", :pipeline_id => pipeline_id)
      nil
    else
      yield(state.pipeline)
      state
    end
  end
end