Class: LogStash::BasePipeline
- Inherits:
-
Object
- Object
- LogStash::BasePipeline
- Includes:
- Util::Loggable
- Defined in:
- lib/logstash/pipeline.rb
Direct Known Subclasses
Instance Attribute Summary collapse
-
#config_hash ⇒ Object
readonly
Returns the value of attribute config_hash.
-
#config_str ⇒ Object
readonly
Returns the value of attribute config_str.
-
#ephemeral_id ⇒ Object
readonly
Returns the value of attribute ephemeral_id.
-
#execution_context ⇒ Object
readonly
Returns the value of attribute execution_context.
-
#filters ⇒ Object
readonly
Returns the value of attribute filters.
-
#inputs ⇒ Object
readonly
Returns the value of attribute inputs.
-
#lir ⇒ Object
readonly
Returns the value of attribute lir.
-
#outputs ⇒ Object
readonly
Returns the value of attribute outputs.
-
#pipeline_config ⇒ Object
readonly
Returns the value of attribute pipeline_config.
-
#pipeline_id ⇒ Object
readonly
Returns the value of attribute pipeline_id.
-
#settings ⇒ Object
readonly
Returns the value of attribute settings.
Instance Method Summary collapse
- #close_dlq_writer ⇒ Object
- #compile_lir ⇒ Object
- #configured_as_reloadable? ⇒ Boolean
- #dlq_writer ⇒ Object
-
#initialize(pipeline_config, namespaced_metric = nil, agent = nil) ⇒ BasePipeline
constructor
A new instance of BasePipeline.
- #non_reloadable_plugins ⇒ Object
- #plugin(plugin_type, name, line, column, *args) ⇒ Object
- #reloadable? ⇒ Boolean
- #reloadable_plugins? ⇒ Boolean
Methods included from Util::Loggable
included, #logger, #slow_logger
Constructor Details
#initialize(pipeline_config, namespaced_metric = nil, agent = nil) ⇒ BasePipeline
Returns a new instance of BasePipeline.
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/logstash/pipeline.rb', line 39 def initialize(pipeline_config, namespaced_metric = nil, agent = nil) @logger = self.logger @mutex = Mutex.new @ephemeral_id = SecureRandom.uuid @pipeline_config = pipeline_config @config_str = pipeline_config.config_string @settings = pipeline_config.settings @config_hash = Digest::SHA1.hexdigest(@config_str) @lir = ConfigCompiler.configToPipelineIR( @config_str, @settings.get_value("config.support_escapes") ) @pipeline_id = @settings.get_value("pipeline.id") || self.object_id @inputs = nil @filters = nil @outputs = nil @agent = agent @dlq_writer = dlq_writer @plugin_factory = LogStash::Plugins::PluginFactory.new( # use NullMetric if called in the BasePipeline context otherwise use the @metric value @lir, LogStash::Plugins::PluginMetricFactory.new(pipeline_id, @metric || Instrument::NullMetric.new), LogStash::Plugins::ExecutionContextFactory.new(@agent, self, @dlq_writer), FilterDelegator ) grammar = LogStashConfigParser.new parsed_config = grammar.parse(config_str) raise(ConfigurationError, grammar.failure_reason) if parsed_config.nil? parsed_config.process_escape_sequences = settings.get_value("config.support_escapes") config_code = parsed_config.compile if settings.get_value("config.debug") @logger.debug("Compiled pipeline code", default_logging_keys(:code => config_code)) end # Evaluate the config compiled code that will initialize all the plugins and define the # filter and output methods. begin eval(config_code) rescue => e raise e end end |
Instance Attribute Details
#config_hash ⇒ Object (readonly)
Returns the value of attribute config_hash.
36 37 38 |
# File 'lib/logstash/pipeline.rb', line 36 def config_hash @config_hash end |
#config_str ⇒ Object (readonly)
Returns the value of attribute config_str.
36 37 38 |
# File 'lib/logstash/pipeline.rb', line 36 def config_str @config_str end |
#ephemeral_id ⇒ Object (readonly)
Returns the value of attribute ephemeral_id.
36 37 38 |
# File 'lib/logstash/pipeline.rb', line 36 def ephemeral_id @ephemeral_id end |
#execution_context ⇒ Object (readonly)
Returns the value of attribute execution_context.
36 37 38 |
# File 'lib/logstash/pipeline.rb', line 36 def execution_context @execution_context end |
#filters ⇒ Object (readonly)
Returns the value of attribute filters.
36 37 38 |
# File 'lib/logstash/pipeline.rb', line 36 def filters @filters end |
#inputs ⇒ Object (readonly)
Returns the value of attribute inputs.
36 37 38 |
# File 'lib/logstash/pipeline.rb', line 36 def inputs @inputs end |
#lir ⇒ Object (readonly)
Returns the value of attribute lir.
36 37 38 |
# File 'lib/logstash/pipeline.rb', line 36 def lir @lir end |
#outputs ⇒ Object (readonly)
Returns the value of attribute outputs.
36 37 38 |
# File 'lib/logstash/pipeline.rb', line 36 def outputs @outputs end |
#pipeline_config ⇒ Object (readonly)
Returns the value of attribute pipeline_config.
37 38 39 |
# File 'lib/logstash/pipeline.rb', line 37 def pipeline_config @pipeline_config end |
#pipeline_id ⇒ Object (readonly)
Returns the value of attribute pipeline_id.
36 37 38 |
# File 'lib/logstash/pipeline.rb', line 36 def pipeline_id @pipeline_id end |
#settings ⇒ Object (readonly)
Returns the value of attribute settings.
36 37 38 |
# File 'lib/logstash/pipeline.rb', line 36 def settings @settings end |
Instance Method Details
#close_dlq_writer ⇒ Object
96 97 98 99 100 101 |
# File 'lib/logstash/pipeline.rb', line 96 def close_dlq_writer @dlq_writer.close if settings.get_value("dead_letter_queue.enable") DeadLetterQueueFactory.release(pipeline_id) end end |
#compile_lir ⇒ Object
103 104 105 106 107 |
# File 'lib/logstash/pipeline.rb', line 103 def compile_lir org.logstash.config.ir.ConfigCompiler.configToPipelineIR( self.config_str, @settings.get_value("config.support_escapes") ) end |
#configured_as_reloadable? ⇒ Boolean
117 118 119 |
# File 'lib/logstash/pipeline.rb', line 117 def configured_as_reloadable? settings.get("pipeline.reloadable") end |
#dlq_writer ⇒ Object
88 89 90 91 92 93 94 |
# File 'lib/logstash/pipeline.rb', line 88 def dlq_writer if settings.get_value("dead_letter_queue.enable") @dlq_writer = DeadLetterQueueFactory.getWriter(pipeline_id, settings.get_value("path.dead_letter_queue"), settings.get_value("dead_letter_queue.max_bytes")) else @dlq_writer = LogStash::Util::DummyDeadLetterQueueWriter.new end end |
#non_reloadable_plugins ⇒ Object
125 126 127 |
# File 'lib/logstash/pipeline.rb', line 125 def non_reloadable_plugins (inputs + filters + outputs).select { |plugin| !plugin.reloadable? } end |
#plugin(plugin_type, name, line, column, *args) ⇒ Object
109 110 111 |
# File 'lib/logstash/pipeline.rb', line 109 def plugin(plugin_type, name, line, column, *args) @plugin_factory.plugin(plugin_type, name, line, column, *args) end |
#reloadable? ⇒ Boolean
113 114 115 |
# File 'lib/logstash/pipeline.rb', line 113 def reloadable? configured_as_reloadable? && reloadable_plugins? end |
#reloadable_plugins? ⇒ Boolean
121 122 123 |
# File 'lib/logstash/pipeline.rb', line 121 def reloadable_plugins? non_reloadable_plugins.empty? end |