Class: LogStash::BasePipeline
- Inherits:
- 
      Object
      
        - Object
- LogStash::BasePipeline
 
- Includes:
- Util::Loggable
- Defined in:
- lib/logstash/pipeline.rb
Direct Known Subclasses
Instance Attribute Summary collapse
- 
  
    
      #config_hash  ⇒ Object 
    
    
  
  
  
  
    
      readonly
    
    
  
  
  
  
  
  
    Returns the value of attribute config_hash. 
- 
  
    
      #config_str  ⇒ Object 
    
    
  
  
  
  
    
      readonly
    
    
  
  
  
  
  
  
    Returns the value of attribute config_str. 
- 
  
    
      #filters  ⇒ Object 
    
    
  
  
  
  
    
      readonly
    
    
  
  
  
  
  
  
    Returns the value of attribute filters. 
- 
  
    
      #inputs  ⇒ Object 
    
    
  
  
  
  
    
      readonly
    
    
  
  
  
  
  
  
    Returns the value of attribute inputs. 
- 
  
    
      #outputs  ⇒ Object 
    
    
  
  
  
  
    
      readonly
    
    
  
  
  
  
  
  
    Returns the value of attribute outputs. 
- 
  
    
      #pipeline_id  ⇒ Object 
    
    
  
  
  
  
    
      readonly
    
    
  
  
  
  
  
  
    Returns the value of attribute pipeline_id. 
Instance Method Summary collapse
- 
  
    
      #initialize(config_str, settings = SETTINGS)  ⇒ BasePipeline 
    
    
  
  
  
    constructor
  
  
  
  
  
  
  
    A new instance of BasePipeline. 
- #non_reloadable_plugins ⇒ Object
- #plugin(plugin_type, name, *args) ⇒ Object
- #reloadable? ⇒ Boolean
Methods included from Util::Loggable
included, #logger, #slow_logger
Constructor Details
#initialize(config_str, settings = SETTINGS) ⇒ BasePipeline
Returns a new instance of BasePipeline.
| 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 | # File 'lib/logstash/pipeline.rb', line 35 def initialize(config_str, settings = SETTINGS) @logger = self.logger @config_str = config_str @config_hash = Digest::SHA1.hexdigest(@config_str) # Every time #plugin is invoked this is incremented to give each plugin # a unique id when auto-generating plugin ids @plugin_counter ||= 0 @pipeline_id = settings.get_value("pipeline.id") || self.object_id # A list of plugins indexed by id @plugins_by_id = {} @inputs = nil @filters = nil @outputs = nil if settings.get_value("dead_letter_queue.enable") @dlq_writer = DeadLetterQueueFactory.getWriter(pipeline_id, settings.get_value("path.dead_letter_queue"), settings.get_value("dead_letter_queue.max_bytes")) else @dlq_writer = LogStash::Util::DummyDeadLetterQueueWriter.new end grammar = LogStashConfigParser.new parsed_config = grammar.parse(config_str) raise(ConfigurationError, grammar.failure_reason) if parsed_config.nil? config_code = parsed_config.compile # config_code = BasePipeline.compileConfig(config_str) if settings.get_value("config.debug") && @logger.debug? @logger.debug("Compiled pipeline code", :code => config_code) end # Evaluate the config compiled code that will initialize all the plugins and define the # filter and output methods. begin eval(config_code) rescue => e raise e end end | 
Instance Attribute Details
#config_hash ⇒ Object (readonly)
Returns the value of attribute config_hash.
| 33 34 35 | # File 'lib/logstash/pipeline.rb', line 33 def config_hash @config_hash end | 
#config_str ⇒ Object (readonly)
Returns the value of attribute config_str.
| 33 34 35 | # File 'lib/logstash/pipeline.rb', line 33 def config_str @config_str end | 
#filters ⇒ Object (readonly)
Returns the value of attribute filters.
| 33 34 35 | # File 'lib/logstash/pipeline.rb', line 33 def filters @filters end | 
#inputs ⇒ Object (readonly)
Returns the value of attribute inputs.
| 33 34 35 | # File 'lib/logstash/pipeline.rb', line 33 def inputs @inputs end | 
#outputs ⇒ Object (readonly)
Returns the value of attribute outputs.
| 33 34 35 | # File 'lib/logstash/pipeline.rb', line 33 def outputs @outputs end | 
#pipeline_id ⇒ Object (readonly)
Returns the value of attribute pipeline_id.
| 33 34 35 | # File 'lib/logstash/pipeline.rb', line 33 def pipeline_id @pipeline_id end | 
Instance Method Details
#non_reloadable_plugins ⇒ Object
| 122 123 124 | # File 'lib/logstash/pipeline.rb', line 122 def non_reloadable_plugins (inputs + filters + outputs).select { |plugin| !plugin.reloadable? } end | 
#plugin(plugin_type, name, *args) ⇒ Object
| 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 | # File 'lib/logstash/pipeline.rb', line 78 def plugin(plugin_type, name, *args) @plugin_counter += 1 # Collapse the array of arguments into a single merged hash args = args.reduce({}, &:merge) id = if args["id"].nil? || args["id"].empty? args["id"] = "#{@config_hash}-#{@plugin_counter}" else args["id"] end raise ConfigurationError, "Two plugins have the id '#{id}', please fix this conflict" if @plugins_by_id[id] @plugins_by_id[id] = true # use NullMetric if called in the BasePipeline context otherwise use the @metric value metric = @metric || Instrument::NullMetric.new pipeline_scoped_metric = metric.namespace([:stats, :pipelines, pipeline_id.to_s.to_sym, :plugins]) # Scope plugins of type 'input' to 'inputs' type_scoped_metric = pipeline_scoped_metric.namespace("#{plugin_type}s".to_sym) klass = Plugin.lookup(plugin_type, name) execution_context = ExecutionContext.new(self, id, klass.config_name, @dlq_writer) if plugin_type == "output" OutputDelegator.new(@logger, klass, type_scoped_metric, execution_context, OutputDelegatorStrategyRegistry.instance, args) elsif plugin_type == "filter" FilterDelegator.new(@logger, klass, type_scoped_metric, execution_context, args) else # input input_plugin = klass.new(args) scoped_metric = type_scoped_metric.namespace(id.to_sym) scoped_metric.gauge(:name, input_plugin.config_name) input_plugin.metric = scoped_metric input_plugin.execution_context = execution_context input_plugin end end | 
#reloadable? ⇒ Boolean
| 118 119 120 | # File 'lib/logstash/pipeline.rb', line 118 def reloadable? non_reloadable_plugins.empty? end |