Class: LogStash::BasePipeline

Inherits:
Object
  • Object
show all
Includes:
Util::Loggable
Defined in:
lib/logstash/pipeline.rb

Direct Known Subclasses

Pipeline

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Util::Loggable

included, #logger, #slow_logger

Constructor Details

#initialize(pipeline_config, namespaced_metric = nil, agent = nil) ⇒ BasePipeline

Returns a new instance of BasePipeline.

Raises:



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/logstash/pipeline.rb', line 41

def initialize(pipeline_config, namespaced_metric = nil, agent = nil)
  @logger = self.logger
  @mutex = Mutex.new
  @ephemeral_id = SecureRandom.uuid

  @pipeline_config = pipeline_config
  @config_str = pipeline_config.config_string
  @settings = pipeline_config.settings
  @config_hash = Digest::SHA1.hexdigest(@config_str)

  @lir = ConfigCompiler.configToPipelineIR(
    @config_str, @settings.get_value("config.support_escapes")
  )

  @pipeline_id = @settings.get_value("pipeline.id") || self.object_id

  @inputs = nil
  @filters = nil
  @outputs = nil
  @agent = agent

  @dlq_writer = dlq_writer

  @plugin_factory = LogStash::Plugins::PluginFactory.new(
    # use NullMetric if called in the BasePipeline context otherwise use the @metric value
    @lir, LogStash::Plugins::PluginMetricFactory.new(pipeline_id, @metric || Instrument::NullMetric.new),
    LogStash::Plugins::ExecutionContextFactory.new(@agent, self, @dlq_writer),
    FilterDelegator
  )
  grammar = LogStashConfigParser.new
  parsed_config = grammar.parse(config_str)
  raise(ConfigurationError, grammar.failure_reason) if parsed_config.nil?

  parsed_config.process_escape_sequences = settings.get_value("config.support_escapes")
  config_code = parsed_config.compile

  if settings.get_value("config.debug")
    @logger.debug("Compiled pipeline code", default_logging_keys(:code => config_code))
  end

  # Evaluate the config compiled code that will initialize all the plugins and define the
  # filter and output methods.
  begin
    eval(config_code)
  rescue => e
    raise e
  end
end

Instance Attribute Details

#config_hashObject (readonly)

Returns the value of attribute config_hash.



38
39
40
# File 'lib/logstash/pipeline.rb', line 38

def config_hash
  @config_hash
end

#config_strObject (readonly)

Returns the value of attribute config_str.



38
39
40
# File 'lib/logstash/pipeline.rb', line 38

def config_str
  @config_str
end

#ephemeral_idObject (readonly)

Returns the value of attribute ephemeral_id.



38
39
40
# File 'lib/logstash/pipeline.rb', line 38

def ephemeral_id
  @ephemeral_id
end

#execution_contextObject (readonly)

Returns the value of attribute execution_context.



38
39
40
# File 'lib/logstash/pipeline.rb', line 38

def execution_context
  @execution_context
end

#filtersObject (readonly)

Returns the value of attribute filters.



38
39
40
# File 'lib/logstash/pipeline.rb', line 38

def filters
  @filters
end

#inputsObject (readonly)

Returns the value of attribute inputs.



38
39
40
# File 'lib/logstash/pipeline.rb', line 38

def inputs
  @inputs
end

#lirObject (readonly)

Returns the value of attribute lir.



38
39
40
# File 'lib/logstash/pipeline.rb', line 38

def lir
  @lir
end

#outputsObject (readonly)

Returns the value of attribute outputs.



38
39
40
# File 'lib/logstash/pipeline.rb', line 38

def outputs
  @outputs
end

#pipeline_configObject (readonly)

Returns the value of attribute pipeline_config.



39
40
41
# File 'lib/logstash/pipeline.rb', line 39

def pipeline_config
  @pipeline_config
end

#pipeline_idObject (readonly)

Returns the value of attribute pipeline_id.



38
39
40
# File 'lib/logstash/pipeline.rb', line 38

def pipeline_id
  @pipeline_id
end

#settingsObject (readonly)

Returns the value of attribute settings.



38
39
40
# File 'lib/logstash/pipeline.rb', line 38

def settings
  @settings
end

Instance Method Details

#close_dlq_writerObject



98
99
100
101
102
103
# File 'lib/logstash/pipeline.rb', line 98

def close_dlq_writer
  @dlq_writer.close
  if settings.get_value("dead_letter_queue.enable")
    DeadLetterQueueFactory.release(pipeline_id)
  end
end

#compile_lirObject



105
106
107
108
109
# File 'lib/logstash/pipeline.rb', line 105

def compile_lir
  org.logstash.config.ir.ConfigCompiler.configToPipelineIR(
    self.config_str, @settings.get_value("config.support_escapes")
  )
end

#configured_as_reloadable?Boolean

Returns:

  • (Boolean)


119
120
121
# File 'lib/logstash/pipeline.rb', line 119

def configured_as_reloadable?
  settings.get("pipeline.reloadable")
end

#dlq_writerObject



90
91
92
93
94
95
96
# File 'lib/logstash/pipeline.rb', line 90

def dlq_writer
  if settings.get_value("dead_letter_queue.enable")
    @dlq_writer = DeadLetterQueueFactory.getWriter(pipeline_id, settings.get_value("path.dead_letter_queue"), settings.get_value("dead_letter_queue.max_bytes"))
  else
    @dlq_writer = LogStash::Util::DummyDeadLetterQueueWriter.new
  end
end

#non_reloadable_pluginsObject



127
128
129
# File 'lib/logstash/pipeline.rb', line 127

def non_reloadable_plugins
  (inputs + filters + outputs).select { |plugin| !plugin.reloadable? }
end

#plugin(plugin_type, name, line, column, *args) ⇒ Object



111
112
113
# File 'lib/logstash/pipeline.rb', line 111

def plugin(plugin_type, name, line, column, *args)
  @plugin_factory.plugin(plugin_type, name, line, column, *args)
end

#reloadable?Boolean

Returns:

  • (Boolean)


115
116
117
# File 'lib/logstash/pipeline.rb', line 115

def reloadable?
  configured_as_reloadable? && reloadable_plugins?
end

#reloadable_plugins?Boolean

Returns:

  • (Boolean)


123
124
125
# File 'lib/logstash/pipeline.rb', line 123

def reloadable_plugins?
  non_reloadable_plugins.empty?
end