Class: LogStash::Config::SourceLoader

Inherits:
Object
  • Object
show all
Includes:
Util::Loggable
Defined in:
lib/logstash/config/source_loader.rb

Defined Under Namespace

Classes: FailedFetch, SuccessfulFetch

Instance Method Summary collapse

Constructor Details

#initialize(settings = LogStash::SETTINGS) ⇒ SourceLoader

Returns a new instance of SourceLoader.



36
37
38
39
40
# File 'lib/logstash/config/source_loader.rb', line 36

def initialize(settings = LogStash::SETTINGS)
  @sources_lock = Mutex.new
  @sources = Set.new
  @settings = settings
end

Instance Method Details

#add_source(new_source) ⇒ Object



112
113
114
115
# File 'lib/logstash/config/source_loader.rb', line 112

def add_source(new_source)
  logger.debug("Adding source", :source => new_source.to_s)
  @sources_lock.synchronize { @sources << new_source }
end

#configure_sources(new_sources) ⇒ Object



106
107
108
109
110
# File 'lib/logstash/config/source_loader.rb', line 106

def configure_sources(new_sources)
  new_sources = Array(new_sources).to_set
  logger.debug("Configure sources", :sources => new_sources.collect(&:to_s))
  @sources_lock.synchronize { @sources = new_sources }
end

#fetchObject

This return a ConfigLoader object that will abstract the call to the different sources and will return multiples pipeline



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/logstash/config/source_loader.rb', line 44

def fetch
  sources_loaders = []

  sources do |source|
    sources_loaders << source if source.match?
  end

  if sources_loaders.empty?
    # This shouldn't happen with the settings object or with any external plugins.
    # but lets add a guard so we fail fast.
    @sources_lock.synchronize do
      logger.error "No source loaders matched! This shouldn't happen", :sources => @sources
    end
    raise LogStash::InvalidSourceLoaderSettingError, "Can't find an appropriate config loader with current settings"
  else
    begin
      pipeline_configs = sources_loaders
        .collect { |source| source.pipeline_configs }
        .compact
        .flatten

      duplicate_ids = find_duplicate_ids(pipeline_configs)

      if duplicate_ids.any?
        logger.debug("Fetching pipelines with duplicate ids", duplicate_ids.each { |k, v| v.collect(&:pipeline_id) } )
        return FailedFetch.new("Found duplicate ids in your source: #{duplicate_ids.keys.sort.join(", ")}")
      end

      if config_debug?
        pipeline_configs.each { |pipeline_config| pipeline_config.display_debug_information }
      end

      if pipeline_configs.empty?
        logger.error("No configuration found in the configured sources.")
      end

      SuccessfulFetch.new(pipeline_configs)
    rescue => e
      logger.error("Could not fetch all the sources", :exception => e.class, :message => e.message, :backtrace => e.backtrace)
      FailedFetch.new(e.message)
    end
  end
end

#remove_source(klass) ⇒ Object



100
101
102
103
104
# File 'lib/logstash/config/source_loader.rb', line 100

def remove_source(klass)
  @sources_lock.synchronize do
    @sources.delete_if { |source| source == klass || source.is_a?(klass) }
  end
end

#sourcesObject



88
89
90
91
92
93
94
95
96
97
98
# File 'lib/logstash/config/source_loader.rb', line 88

def sources
  @sources_lock.synchronize do
    if block_given?
      @sources.each do |source|
        yield source
      end
    else
      @sources
    end
  end
end