Class: LogStash::BasePipeline
- Inherits:
-
Object
- Object
- LogStash::BasePipeline
- Includes:
- Util::Loggable
- Defined in:
- lib/logstash/pipeline.rb
Direct Known Subclasses
Instance Attribute Summary collapse
-
#config_hash ⇒ Object
readonly
Returns the value of attribute config_hash.
-
#config_str ⇒ Object
readonly
Returns the value of attribute config_str.
-
#ephemeral_id ⇒ Object
readonly
Returns the value of attribute ephemeral_id.
-
#execution_context ⇒ Object
readonly
Returns the value of attribute execution_context.
-
#filters ⇒ Object
readonly
Returns the value of attribute filters.
-
#inputs ⇒ Object
readonly
Returns the value of attribute inputs.
-
#lir ⇒ Object
readonly
Returns the value of attribute lir.
-
#outputs ⇒ Object
readonly
Returns the value of attribute outputs.
-
#pipeline_config ⇒ Object
readonly
Returns the value of attribute pipeline_config.
-
#pipeline_id ⇒ Object
readonly
Returns the value of attribute pipeline_id.
-
#settings ⇒ Object
readonly
Returns the value of attribute settings.
Instance Method Summary collapse
- #close_dlq_writer ⇒ Object
- #compile_lir ⇒ Object
- #configured_as_reloadable? ⇒ Boolean
- #dlq_writer ⇒ Object
-
#initialize(pipeline_config, namespaced_metric = nil, agent = nil) ⇒ BasePipeline
constructor
A new instance of BasePipeline.
- #non_reloadable_plugins ⇒ Object
- #plugin(plugin_type, name, line, column, *args) ⇒ Object
- #reloadable? ⇒ Boolean
- #reloadable_plugins? ⇒ Boolean
Methods included from Util::Loggable
included, #logger, #slow_logger
Constructor Details
#initialize(pipeline_config, namespaced_metric = nil, agent = nil) ⇒ BasePipeline
Returns a new instance of BasePipeline.
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/logstash/pipeline.rb', line 41 def initialize(pipeline_config, namespaced_metric = nil, agent = nil) @logger = self.logger @mutex = Mutex.new @ephemeral_id = SecureRandom.uuid @pipeline_config = pipeline_config @config_str = pipeline_config.config_string @settings = pipeline_config.settings @config_hash = Digest::SHA1.hexdigest(@config_str) @lir = ConfigCompiler.configToPipelineIR( @config_str, @settings.get_value("config.support_escapes") ) @pipeline_id = @settings.get_value("pipeline.id") || self.object_id @inputs = nil @filters = nil @outputs = nil @agent = agent @dlq_writer = dlq_writer @plugin_factory = LogStash::Plugins::PluginFactory.new( # use NullMetric if called in the BasePipeline context otherwise use the @metric value @lir, LogStash::Plugins::PluginMetricFactory.new(pipeline_id, @metric || Instrument::NullMetric.new), LogStash::Plugins::ExecutionContextFactory.new(@agent, self, @dlq_writer), FilterDelegator ) grammar = LogStashConfigParser.new parsed_config = grammar.parse(config_str) raise(ConfigurationError, grammar.failure_reason) if parsed_config.nil? parsed_config.process_escape_sequences = settings.get_value("config.support_escapes") config_code = parsed_config.compile if settings.get_value("config.debug") @logger.debug("Compiled pipeline code", default_logging_keys(:code => config_code)) end # Evaluate the config compiled code that will initialize all the plugins and define the # filter and output methods. begin eval(config_code) rescue => e raise e end end |
Instance Attribute Details
#config_hash ⇒ Object (readonly)
Returns the value of attribute config_hash.
38 39 40 |
# File 'lib/logstash/pipeline.rb', line 38 def config_hash @config_hash end |
#config_str ⇒ Object (readonly)
Returns the value of attribute config_str.
38 39 40 |
# File 'lib/logstash/pipeline.rb', line 38 def config_str @config_str end |
#ephemeral_id ⇒ Object (readonly)
Returns the value of attribute ephemeral_id.
38 39 40 |
# File 'lib/logstash/pipeline.rb', line 38 def ephemeral_id @ephemeral_id end |
#execution_context ⇒ Object (readonly)
Returns the value of attribute execution_context.
38 39 40 |
# File 'lib/logstash/pipeline.rb', line 38 def execution_context @execution_context end |
#filters ⇒ Object (readonly)
Returns the value of attribute filters.
38 39 40 |
# File 'lib/logstash/pipeline.rb', line 38 def filters @filters end |
#inputs ⇒ Object (readonly)
Returns the value of attribute inputs.
38 39 40 |
# File 'lib/logstash/pipeline.rb', line 38 def inputs @inputs end |
#lir ⇒ Object (readonly)
Returns the value of attribute lir.
38 39 40 |
# File 'lib/logstash/pipeline.rb', line 38 def lir @lir end |
#outputs ⇒ Object (readonly)
Returns the value of attribute outputs.
38 39 40 |
# File 'lib/logstash/pipeline.rb', line 38 def outputs @outputs end |
#pipeline_config ⇒ Object (readonly)
Returns the value of attribute pipeline_config.
39 40 41 |
# File 'lib/logstash/pipeline.rb', line 39 def pipeline_config @pipeline_config end |
#pipeline_id ⇒ Object (readonly)
Returns the value of attribute pipeline_id.
38 39 40 |
# File 'lib/logstash/pipeline.rb', line 38 def pipeline_id @pipeline_id end |
#settings ⇒ Object (readonly)
Returns the value of attribute settings.
38 39 40 |
# File 'lib/logstash/pipeline.rb', line 38 def settings @settings end |
Instance Method Details
#close_dlq_writer ⇒ Object
98 99 100 101 102 103 |
# File 'lib/logstash/pipeline.rb', line 98 def close_dlq_writer @dlq_writer.close if settings.get_value("dead_letter_queue.enable") DeadLetterQueueFactory.release(pipeline_id) end end |
#compile_lir ⇒ Object
105 106 107 108 109 |
# File 'lib/logstash/pipeline.rb', line 105 def compile_lir org.logstash.config.ir.ConfigCompiler.configToPipelineIR( self.config_str, @settings.get_value("config.support_escapes") ) end |
#configured_as_reloadable? ⇒ Boolean
119 120 121 |
# File 'lib/logstash/pipeline.rb', line 119 def configured_as_reloadable? settings.get("pipeline.reloadable") end |
#dlq_writer ⇒ Object
90 91 92 93 94 95 96 |
# File 'lib/logstash/pipeline.rb', line 90 def dlq_writer if settings.get_value("dead_letter_queue.enable") @dlq_writer = DeadLetterQueueFactory.getWriter(pipeline_id, settings.get_value("path.dead_letter_queue"), settings.get_value("dead_letter_queue.max_bytes")) else @dlq_writer = LogStash::Util::DummyDeadLetterQueueWriter.new end end |
#non_reloadable_plugins ⇒ Object
127 128 129 |
# File 'lib/logstash/pipeline.rb', line 127 def non_reloadable_plugins (inputs + filters + outputs).select { |plugin| !plugin.reloadable? } end |
#plugin(plugin_type, name, line, column, *args) ⇒ Object
111 112 113 |
# File 'lib/logstash/pipeline.rb', line 111 def plugin(plugin_type, name, line, column, *args) @plugin_factory.plugin(plugin_type, name, line, column, *args) end |
#reloadable? ⇒ Boolean
115 116 117 |
# File 'lib/logstash/pipeline.rb', line 115 def reloadable? configured_as_reloadable? && reloadable_plugins? end |
#reloadable_plugins? ⇒ Boolean
123 124 125 |
# File 'lib/logstash/pipeline.rb', line 123 def reloadable_plugins? non_reloadable_plugins.empty? end |