Class: LogStash::Inputs::GoogleCloudStorage

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/google_cloud_storage.rb

Overview

GoogleCloudStorage is an input plugin for Logstash that reads blobs in Cloud Storage buckets.

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#event_output_queueObject

Accessors for testing



36
37
38
# File 'lib/logstash/inputs/google_cloud_storage.rb', line 36

def event_output_queue
  @event_output_queue
end

#processed_dbObject (readonly)

Returns the value of attribute processed_db.



37
38
39
# File 'lib/logstash/inputs/google_cloud_storage.rb', line 37

def processed_db
  @processed_db
end

Instance Method Details

#list_download_processObject

Fetches new files ready to be processed, downloads and processes them and finally runs post-processing steps.



67
68
69
70
71
72
73
# File 'lib/logstash/inputs/google_cloud_storage.rb', line 67

def list_download_process
  list_processable_blobs do |blob|
    @logger.info("Found matching blob #{blob.uri}")
    download_and_process(blob)
    postprocess(blob)
  end
end

#registerObject



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/logstash/inputs/google_cloud_storage.rb', line 39

def register
  FileUtils.mkdir_p(@temp_directory) unless Dir.exist?(@temp_directory)

  @client = LogStash::Inputs::CloudStorage::Client.new(@bucket_id, @json_key_file, @logger)

  if @processed_db_path.nil?
    ls_data = LogStash::SETTINGS.get_value('path.data')
    @processed_db_path = File.join(ls_data, 'plugins', 'inputs', 'google_cloud_storage', 'db')
  end

  @logger.info("ProcessedDb created in: #{@processed_db_path}")

  @processed_db = LogStash::Inputs::CloudStorage::ProcessedDb.new(@processed_db_path)

  @blob_filter = LogStash::Inputs::CloudStorage::BlobFilter.new(@logger, @file_matches, @file_exclude, @metadata_key, @processed_db)
end

#run(queue) ⇒ Object



56
57
58
59
60
61
62
63
# File 'lib/logstash/inputs/google_cloud_storage.rb', line 56

def run(queue)
  @event_output_queue = queue

  @main_plugin_thread = Thread.current
  Stud.interval(@interval) do
    list_download_process
  end
end

#stopObject



75
76
77
78
# File 'lib/logstash/inputs/google_cloud_storage.rb', line 75

def stop
  # Stud events were started on the main plugin thread so stop all events relative to it.
  Stud.stop!(@main_plugin_thread)
end