Class: LogStash::BasePipeline

Inherits:
Object
  • Object
show all
Includes:
Util::Loggable
Defined in:
lib/logstash/pipeline.rb

Direct Known Subclasses

Pipeline

Constant Summary collapse

RELOAD_INCOMPATIBLE_PLUGINS =
[
    "LogStash::Inputs::Stdin"
]

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Util::Loggable

included, #logger, #slow_logger

Constructor Details

#initialize(config_str, settings = SETTINGS) ⇒ BasePipeline

Returns a new instance of BasePipeline.

Raises:



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/logstash/pipeline.rb', line 34

def initialize(config_str, settings = SETTINGS)
  @logger = self.logger
  @config_str = config_str
  @config_hash = Digest::SHA1.hexdigest(@config_str)
  # Every time #plugin is invoked this is incremented to give each plugin
  # a unique id when auto-generating plugin ids
  @plugin_counter ||= 0

  @pipeline_id = settings.get_value("pipeline.id") || self.object_id

  # A list of plugins indexed by id
  @plugins_by_id = {}
  @inputs = nil
  @filters = nil
  @outputs = nil

  grammar = LogStashConfigParser.new
  parsed_config = grammar.parse(config_str)
  raise(ConfigurationError, grammar.failure_reason) if parsed_config.nil?

  config_code = parsed_config.compile

  # config_code = BasePipeline.compileConfig(config_str)

  if settings.get_value("config.debug") && @logger.debug?
    @logger.debug("Compiled pipeline code", :code => config_code)
  end

  # Evaluate the config compiled code that will initialize all the plugins and define the
  # filter and output methods.
  begin
    eval(config_code)
  rescue => e
    raise e
  end
end

Instance Attribute Details

#config_hashObject (readonly)

Returns the value of attribute config_hash.



28
29
30
# File 'lib/logstash/pipeline.rb', line 28

def config_hash
  @config_hash
end

#config_strObject (readonly)

Returns the value of attribute config_str.



28
29
30
# File 'lib/logstash/pipeline.rb', line 28

def config_str
  @config_str
end

#filtersObject (readonly)

Returns the value of attribute filters.



28
29
30
# File 'lib/logstash/pipeline.rb', line 28

def filters
  @filters
end

#inputsObject (readonly)

Returns the value of attribute inputs.



28
29
30
# File 'lib/logstash/pipeline.rb', line 28

def inputs
  @inputs
end

#outputsObject (readonly)

Returns the value of attribute outputs.



28
29
30
# File 'lib/logstash/pipeline.rb', line 28

def outputs
  @outputs
end

#pipeline_idObject (readonly)

Returns the value of attribute pipeline_id.



28
29
30
# File 'lib/logstash/pipeline.rb', line 28

def pipeline_id
  @pipeline_id
end

Instance Method Details

#non_reloadable_pluginsObject



108
109
110
111
112
# File 'lib/logstash/pipeline.rb', line 108

def non_reloadable_plugins
  (inputs + filters + outputs).select do |plugin|
    RELOAD_INCOMPATIBLE_PLUGINS.include?(plugin.class.name)
  end
end

#plugin(plugin_type, name, *args) ⇒ Object

Raises:



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/logstash/pipeline.rb', line 71

def plugin(plugin_type, name, *args)
  @plugin_counter += 1

  # Collapse the array of arguments into a single merged hash
  args = args.reduce({}, &:merge)

  id = if args["id"].nil? || args["id"].empty?
         args["id"] = "#{@config_hash}-#{@plugin_counter}"
       else
         args["id"]
       end

  raise ConfigurationError, "Two plugins have the id '#{id}', please fix this conflict" if @plugins_by_id[id]

  @plugins_by_id[id] = true

  # use NullMetric if called in the BasePipeline context otherwise use the @metric value
  metric = @metric || Instrument::NullMetric.new

  pipeline_scoped_metric = metric.namespace([:stats, :pipelines, pipeline_id.to_s.to_sym, :plugins])

  # Scope plugins of type 'input' to 'inputs'
  type_scoped_metric = pipeline_scoped_metric.namespace("#{plugin_type}s".to_sym)

  klass = Plugin.lookup(plugin_type, name)

  if plugin_type == "output"
    OutputDelegator.new(@logger, klass, type_scoped_metric, OutputDelegatorStrategyRegistry.instance,  args)
  elsif plugin_type == "filter"
    FilterDelegator.new(@logger, klass, type_scoped_metric, args)
  else # input
    input_plugin = klass.new(args)
    input_plugin.metric = type_scoped_metric.namespace(id)
    input_plugin
  end
end