Class: LogStash::BasePipeline
- Inherits:
-
Object
- Object
- LogStash::BasePipeline
- Includes:
- Util::Loggable
- Defined in:
- lib/logstash/pipeline.rb
Direct Known Subclasses
Instance Attribute Summary collapse
-
#config_hash ⇒ Object
readonly
Returns the value of attribute config_hash.
-
#config_str ⇒ Object
readonly
Returns the value of attribute config_str.
-
#ephemeral_id ⇒ Object
readonly
Returns the value of attribute ephemeral_id.
-
#execution_context ⇒ Object
readonly
Returns the value of attribute execution_context.
-
#filters ⇒ Object
readonly
Returns the value of attribute filters.
-
#inputs ⇒ Object
readonly
Returns the value of attribute inputs.
-
#lir ⇒ Object
readonly
Returns the value of attribute lir.
-
#outputs ⇒ Object
readonly
Returns the value of attribute outputs.
-
#pipeline_config ⇒ Object
readonly
Returns the value of attribute pipeline_config.
-
#pipeline_id ⇒ Object
readonly
Returns the value of attribute pipeline_id.
-
#settings ⇒ Object
readonly
Returns the value of attribute settings.
Instance Method Summary collapse
- #close_dlq_writer ⇒ Object
- #compile_lir ⇒ Object
- #configured_as_reloadable? ⇒ Boolean
- #dlq_writer ⇒ Object
-
#initialize(pipeline_config, namespaced_metric = nil, agent = nil) ⇒ BasePipeline
constructor
A new instance of BasePipeline.
- #non_reloadable_plugins ⇒ Object
- #plugin(plugin_type, name, line, column, *args) ⇒ Object
- #reloadable? ⇒ Boolean
- #reloadable_plugins? ⇒ Boolean
Methods included from Util::Loggable
included, #logger, #slow_logger
Constructor Details
#initialize(pipeline_config, namespaced_metric = nil, agent = nil) ⇒ BasePipeline
Returns a new instance of BasePipeline.
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/logstash/pipeline.rb', line 41 def initialize(pipeline_config, namespaced_metric = nil, agent = nil) @logger = self.logger @mutex = Mutex.new @ephemeral_id = SecureRandom.uuid @pipeline_config = pipeline_config @config_str = pipeline_config.config_string @settings = pipeline_config.settings @config_hash = Digest::SHA1.hexdigest(@config_str) @lir = ConfigCompiler.configToPipelineIR( @config_str, @settings.get_value("config.support_escapes") ) @pipeline_id = @settings.get_value("pipeline.id") || self.object_id @inputs = nil @filters = nil @outputs = nil @agent = agent @dlq_writer = dlq_writer @plugin_factory = LogStash::Plugins::PluginFactory.new( # use NullMetric if called in the BasePipeline context otherwise use the @metric value @lir, LogStash::Plugins::PluginMetricFactory.new(pipeline_id, @metric || Instrument::NullMetric.new), @logger, LogStash::Plugins::ExecutionContextFactory.new(@agent, self, @dlq_writer), FilterDelegator ) grammar = LogStashConfigParser.new parsed_config = grammar.parse(config_str) raise(ConfigurationError, grammar.failure_reason) if parsed_config.nil? parsed_config.process_escape_sequences = settings.get_value("config.support_escapes") config_code = parsed_config.compile # config_code = BasePipeline.compileConfig(config_str) if settings.get_value("config.debug") && @logger.debug? @logger.debug("Compiled pipeline code", default_logging_keys(:code => config_code)) end # Evaluate the config compiled code that will initialize all the plugins and define the # filter and output methods. begin eval(config_code) rescue => e raise e end end |
Instance Attribute Details
#config_hash ⇒ Object (readonly)
Returns the value of attribute config_hash.
38 39 40 |
# File 'lib/logstash/pipeline.rb', line 38 def config_hash @config_hash end |
#config_str ⇒ Object (readonly)
Returns the value of attribute config_str.
38 39 40 |
# File 'lib/logstash/pipeline.rb', line 38 def config_str @config_str end |
#ephemeral_id ⇒ Object (readonly)
Returns the value of attribute ephemeral_id.
38 39 40 |
# File 'lib/logstash/pipeline.rb', line 38 def ephemeral_id @ephemeral_id end |
#execution_context ⇒ Object (readonly)
Returns the value of attribute execution_context.
38 39 40 |
# File 'lib/logstash/pipeline.rb', line 38 def execution_context @execution_context end |
#filters ⇒ Object (readonly)
Returns the value of attribute filters.
38 39 40 |
# File 'lib/logstash/pipeline.rb', line 38 def filters @filters end |
#inputs ⇒ Object (readonly)
Returns the value of attribute inputs.
38 39 40 |
# File 'lib/logstash/pipeline.rb', line 38 def inputs @inputs end |
#lir ⇒ Object (readonly)
Returns the value of attribute lir.
38 39 40 |
# File 'lib/logstash/pipeline.rb', line 38 def lir @lir end |
#outputs ⇒ Object (readonly)
Returns the value of attribute outputs.
38 39 40 |
# File 'lib/logstash/pipeline.rb', line 38 def outputs @outputs end |
#pipeline_config ⇒ Object (readonly)
Returns the value of attribute pipeline_config.
39 40 41 |
# File 'lib/logstash/pipeline.rb', line 39 def pipeline_config @pipeline_config end |
#pipeline_id ⇒ Object (readonly)
Returns the value of attribute pipeline_id.
38 39 40 |
# File 'lib/logstash/pipeline.rb', line 38 def pipeline_id @pipeline_id end |
#settings ⇒ Object (readonly)
Returns the value of attribute settings.
38 39 40 |
# File 'lib/logstash/pipeline.rb', line 38 def settings @settings end |
Instance Method Details
#close_dlq_writer ⇒ Object
100 101 102 103 104 105 |
# File 'lib/logstash/pipeline.rb', line 100 def close_dlq_writer @dlq_writer.close if settings.get_value("dead_letter_queue.enable") DeadLetterQueueFactory.release(pipeline_id) end end |
#compile_lir ⇒ Object
107 108 109 110 111 |
# File 'lib/logstash/pipeline.rb', line 107 def compile_lir org.logstash.config.ir.ConfigCompiler.configToPipelineIR( self.config_str, @settings.get_value("config.support_escapes") ) end |
#configured_as_reloadable? ⇒ Boolean
121 122 123 |
# File 'lib/logstash/pipeline.rb', line 121 def configured_as_reloadable? settings.get("pipeline.reloadable") end |
#dlq_writer ⇒ Object
92 93 94 95 96 97 98 |
# File 'lib/logstash/pipeline.rb', line 92 def dlq_writer if settings.get_value("dead_letter_queue.enable") @dlq_writer = DeadLetterQueueFactory.getWriter(pipeline_id, settings.get_value("path.dead_letter_queue"), settings.get_value("dead_letter_queue.max_bytes")) else @dlq_writer = LogStash::Util::DummyDeadLetterQueueWriter.new end end |
#non_reloadable_plugins ⇒ Object
129 130 131 |
# File 'lib/logstash/pipeline.rb', line 129 def non_reloadable_plugins (inputs + filters + outputs).select { |plugin| !plugin.reloadable? } end |
#plugin(plugin_type, name, line, column, *args) ⇒ Object
113 114 115 |
# File 'lib/logstash/pipeline.rb', line 113 def plugin(plugin_type, name, line, column, *args) @plugin_factory.plugin(plugin_type, name, line, column, *args) end |
#reloadable? ⇒ Boolean
117 118 119 |
# File 'lib/logstash/pipeline.rb', line 117 def reloadable? configured_as_reloadable? && reloadable_plugins? end |
#reloadable_plugins? ⇒ Boolean
125 126 127 |
# File 'lib/logstash/pipeline.rb', line 125 def reloadable_plugins? non_reloadable_plugins.empty? end |