Class: LogStash::BasePipeline

Inherits:
Object
  • Object
show all
Includes:
Util::Loggable
Defined in:
lib/logstash/pipeline.rb

Direct Known Subclasses

Pipeline

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Util::Loggable

included, #logger, #slow_logger

Constructor Details

#initialize(config_str, settings = SETTINGS) ⇒ BasePipeline

Returns a new instance of BasePipeline.

Raises:



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/logstash/pipeline.rb', line 35

def initialize(config_str, settings = SETTINGS)
  @logger = self.logger
  @config_str = config_str
  @config_hash = Digest::SHA1.hexdigest(@config_str)
  # Every time #plugin is invoked this is incremented to give each plugin
  # a unique id when auto-generating plugin ids
  @plugin_counter ||= 0

  @pipeline_id = settings.get_value("pipeline.id") || self.object_id

  # A list of plugins indexed by id
  @plugins_by_id = {}
  @inputs = nil
  @filters = nil
  @outputs = nil

  if settings.get_value("dead_letter_queue.enable")
    @dlq_writer = DeadLetterQueueFactory.getWriter(pipeline_id, settings.get_value("path.dead_letter_queue"), settings.get_value("dead_letter_queue.max_bytes"))
  else
    @dlq_writer = LogStash::Util::DummyDeadLetterQueueWriter.new
  end

  grammar = LogStashConfigParser.new
  parsed_config = grammar.parse(config_str)
  raise(ConfigurationError, grammar.failure_reason) if parsed_config.nil?

  config_code = parsed_config.compile

  # config_code = BasePipeline.compileConfig(config_str)

  if settings.get_value("config.debug") && @logger.debug?
    @logger.debug("Compiled pipeline code", :code => config_code)
  end

  # Evaluate the config compiled code that will initialize all the plugins and define the
  # filter and output methods.
  begin
    eval(config_code)
  rescue => e
    raise e
  end
end

Instance Attribute Details

#config_hashObject (readonly)

Returns the value of attribute config_hash.



33
34
35
# File 'lib/logstash/pipeline.rb', line 33

def config_hash
  @config_hash
end

#config_strObject (readonly)

Returns the value of attribute config_str.



33
34
35
# File 'lib/logstash/pipeline.rb', line 33

def config_str
  @config_str
end

#filtersObject (readonly)

Returns the value of attribute filters.



33
34
35
# File 'lib/logstash/pipeline.rb', line 33

def filters
  @filters
end

#inputsObject (readonly)

Returns the value of attribute inputs.



33
34
35
# File 'lib/logstash/pipeline.rb', line 33

def inputs
  @inputs
end

#outputsObject (readonly)

Returns the value of attribute outputs.



33
34
35
# File 'lib/logstash/pipeline.rb', line 33

def outputs
  @outputs
end

#pipeline_idObject (readonly)

Returns the value of attribute pipeline_id.



33
34
35
# File 'lib/logstash/pipeline.rb', line 33

def pipeline_id
  @pipeline_id
end

Instance Method Details

#non_reloadable_pluginsObject



122
123
124
# File 'lib/logstash/pipeline.rb', line 122

def non_reloadable_plugins
  (inputs + filters + outputs).select { |plugin| !plugin.reloadable? }
end

#plugin(plugin_type, name, *args) ⇒ Object

Raises:



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/logstash/pipeline.rb', line 78

def plugin(plugin_type, name, *args)
  @plugin_counter += 1

  # Collapse the array of arguments into a single merged hash
  args = args.reduce({}, &:merge)

  id = if args["id"].nil? || args["id"].empty?
    args["id"] = "#{@config_hash}-#{@plugin_counter}"
  else
    args["id"]
  end

  raise ConfigurationError, "Two plugins have the id '#{id}', please fix this conflict" if @plugins_by_id[id]
  @plugins_by_id[id] = true

  # use NullMetric if called in the BasePipeline context otherwise use the @metric value
  metric = @metric || Instrument::NullMetric.new

  pipeline_scoped_metric = metric.namespace([:stats, :pipelines, pipeline_id.to_s.to_sym, :plugins])
  # Scope plugins of type 'input' to 'inputs'
  type_scoped_metric = pipeline_scoped_metric.namespace("#{plugin_type}s".to_sym)

  klass = Plugin.lookup(plugin_type, name)

  execution_context = ExecutionContext.new(self, id, klass.config_name, @dlq_writer)

  if plugin_type == "output"
    OutputDelegator.new(@logger, klass, type_scoped_metric, execution_context, OutputDelegatorStrategyRegistry.instance, args)
  elsif plugin_type == "filter"
    FilterDelegator.new(@logger, klass, type_scoped_metric, execution_context, args)
  else # input
    input_plugin = klass.new(args)
    scoped_metric = type_scoped_metric.namespace(id.to_sym)
    scoped_metric.gauge(:name, input_plugin.config_name)
    input_plugin.metric = scoped_metric
    input_plugin.execution_context = execution_context
    input_plugin
  end
end

#reloadable?Boolean

Returns:

  • (Boolean)


118
119
120
# File 'lib/logstash/pipeline.rb', line 118

def reloadable?
  non_reloadable_plugins.empty?
end