Class: LogStash::PipelinesRegistry

Inherits:
Object
  • Object
show all
Includes:
Util::Loggable
Defined in:
lib/logstash/pipelines_registry.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializePipelinesRegistry

Returns a new instance of PipelinesRegistry.



32
33
34
35
36
37
38
39
# File 'lib/logstash/pipelines_registry.rb', line 32

def initialize
  # we leverage the semantic of the Java ConcurrentHashMap for the
  # compute() method which is atomic; calling compute() concurrently
  # will block until the other compute finishes so no mutex is necessary
  # for synchronizing compute calls
  @states = java.util.concurrent.ConcurrentHashMap.new
  @locks = java.util.concurrent.ConcurrentHashMap.new
end

Instance Attribute Details

#statesObject (readonly)

Returns the value of attribute states.



29
30
31
# File 'lib/logstash/pipelines_registry.rb', line 29

def states
  @states
end

Instance Method Details

#create_pipeline(pipeline_id, pipeline, &create_block) ⇒ Boolean

Execute the passed creation logic block and create a new state upon success

Parameters:

  • pipeline_id (String, Symbol)

    the pipeline id

  • pipeline (Pipeline)

    the new pipeline to create

  • create_block (Block)

    the creation execution logic

Yield Returns:

  • (Boolean)

    the new pipeline creation success

Returns:

  • (Boolean)

    new pipeline creation success



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/logstash/pipelines_registry.rb', line 49

def create_pipeline(pipeline_id, pipeline, &create_block)
  lock = get_lock(pipeline_id)
  lock.lock

  success = false

  state = @states.get(pipeline_id)
  if state
    if state.terminated?
      success = yield
      state.set_pipeline(pipeline)
    else
      logger.error("Attempted to create a pipeline that already exists", :pipeline_id => pipeline_id)
    end
    @states.put(pipeline_id, state)
  else
    success = yield
    @states.put(pipeline_id, PipelineState.new(pipeline_id, pipeline)) if success
  end

  success
ensure
  lock.unlock
end

#empty?Boolean

Returns true if the states collection is empty.

Returns:

  • (Boolean)

    true if the states collection is empty.



140
141
142
# File 'lib/logstash/pipelines_registry.rb', line 140

def empty?
  @states.isEmpty
end

#get_pipeline(pipeline_id) ⇒ Pipeline

Returns the pipeline object or nil if none for pipeline_id.

Parameters:

  • pipeline_id (String, Symbol)

    the pipeline id

Returns:

  • (Pipeline)

    the pipeline object or nil if none for pipeline_id



129
130
131
132
# File 'lib/logstash/pipelines_registry.rb', line 129

def get_pipeline(pipeline_id)
  state = @states.get(pipeline_id)
  state.nil? ? nil : state.pipeline
end

#non_running_pipelinesHash{String=>Pipeline}

Returns:



150
151
152
# File 'lib/logstash/pipelines_registry.rb', line 150

def non_running_pipelines
  select_pipelines { |state| state.terminated? }
end

#reload_pipeline(pipeline_id, &reload_block) ⇒ Boolean

Execute the passed reloading logic block in the context of the reloading state and set new pipeline in state

Parameters:

  • pipeline_id (String, Symbol)

    the pipeline id

  • reload_block (Block)

    the reloading execution logic

Yield Returns:

  • (Array<Boolean, Pipeline>)

    the new pipeline creation success and new pipeline object

Returns:

  • (Boolean)

    new pipeline creation success



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/logstash/pipelines_registry.rb', line 102

def reload_pipeline(pipeline_id, &reload_block)
  lock = get_lock(pipeline_id)
  lock.lock
  success = false

  state = @states.get(pipeline_id)
  if state.nil?
    logger.error("Attempted to reload a pipeline that does not exists", :pipeline_id => pipeline_id)
    @states.remove(pipeline_id)
  else
    state.set_reloading(true)
    begin
      success, new_pipeline = yield
      state.set_pipeline(new_pipeline)
    ensure
      state.set_reloading(false)
    end
    @states.put(pipeline_id, state)
  end

success
ensure
  lock.unlock
end

#running_pipelinesHash{String=>Pipeline}

Returns:



145
146
147
# File 'lib/logstash/pipelines_registry.rb', line 145

def running_pipelines
  select_pipelines { |state| !state.terminated? }
end

#running_user_defined_pipelinesHash{String=>Pipeline}

Returns:



155
156
157
# File 'lib/logstash/pipelines_registry.rb', line 155

def running_user_defined_pipelines
  select_pipelines { |state | !state.terminated? && !state.pipeline.system? }
end

#sizeFixnum

Returns number of items in the states collection.

Returns:

  • (Fixnum)

    number of items in the states collection



135
136
137
# File 'lib/logstash/pipelines_registry.rb', line 135

def size
  @states.size
end

#terminate_pipeline(pipeline_id, &stop_block) {|the| ... } ⇒ Object

Execute the passed termination logic block

Parameters:

  • pipeline_id (String, Symbol)

    the pipeline id

  • stop_block (Block)

    the termination execution logic

Yield Parameters:

  • the (Pipeline)

    pipeline to terminate



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/logstash/pipelines_registry.rb', line 79

def terminate_pipeline(pipeline_id, &stop_block)
  lock = get_lock(pipeline_id)
  lock.lock

  state = @states.get(pipeline_id)
  if state.nil?
    logger.error("Attempted to terminate a pipeline that does not exists", :pipeline_id => pipeline_id)
    @states.remove(pipeline_id)
  else
    yield(state.pipeline)
    @states.put(pipeline_id, state)
  end
ensure
  lock.unlock
end