Class: LogStash::Inputs::GoogleCloudStorage
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::GoogleCloudStorage
- Defined in:
- lib/logstash/inputs/google_cloud_storage.rb
Overview
Generate a repeating message.
This plugin is intented only as an example.
Defined Under Namespace
Modules: SinceDB
Instance Method Summary collapse
- #backup_to_bucket(filename) ⇒ Object
- #backup_to_dir(filename) ⇒ Object
- #list_new_files ⇒ Object
- #process_files(queue) ⇒ Object
- #register ⇒ Object
-
#run(queue) ⇒ Object
def register.
- #stop ⇒ Object
Instance Method Details
#backup_to_bucket(filename) ⇒ Object
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/logstash/inputs/google_cloud_storage.rb', line 136 def backup_to_bucket(filename) unless @backup_to_bucket.nil? backup_key = "#{@backup_add_prefix}#{filename}" @logger.debug ("GCS input: bck_up object " + backup_key) result = @client.execute( api_method: @gcs.objects.copy, parameters: {destinationBucket: @backup_to_bucket, destinationObject:backup_key, sourceBucket: @bucket, sourceObject: filename } ) if @delete result = @client.execute( api_method: @gcs.objects.delete, parameters: { bucket: @bucket, object: filename } ) end end end |
#backup_to_dir(filename) ⇒ Object
157 158 159 160 161 |
# File 'lib/logstash/inputs/google_cloud_storage.rb', line 157 def backup_to_dir(filename) unless @backup_to_dir.nil? FileUtils.cp(filename, @backup_to_dir) end end |
#list_new_files ⇒ Object
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/logstash/inputs/google_cloud_storage.rb', line 104 def list_new_files @logger.debug("GCS input: Polling new objects from bucket "+ @bucket) objects = @client.execute( api_method: @gcs.objects.list, parameters: {bucket: @bucket} ) logFiles = {} objects.data.items.each do |file| unless ignore_filename?(file.name) if sincedb.newer?(file.updated) logFiles[file.name] = file.updated @logger.info("GCS input: Adding to objects[]", :name => file.name) @logger.debug("GCS input: objects[] length is: ", :length => logFiles.length) end end end return logFiles.keys.sort {|a,b| logFiles[a] <=> logFiles[b]} end |
#process_files(queue) ⇒ Object
165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/logstash/inputs/google_cloud_storage.rb', line 165 def process_files(queue) objects = list_new_files objects.each do |filename| if stop? break else @logger.debug("GCS input: processing ", :bucket => @bucket, :filename => filename) process_log(queue, filename) end end end |
#register ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/logstash/inputs/google_cloud_storage.rb', line 64 def register @host = Socket.gethostname require "fileutils" require "digest/md5" require "google/api_client" require "openssl" @logger.debug("Registering GCS input", :bucket => @bucket, :project => @project, :keyfile => @keyfile) # initialize_google_client @logger.info ("\n===========================GCS INPUT PLUG-IN===========================") @client = Google::APIClient.new(:application_name => 'Logstash Google Cloud Storage input plugin', :application_version => '0.1') key = Google::APIClient::PKCS12.load_key(@keyfile, @key_password) service_account = Google::APIClient::JWTAsserter.new(@service_account, 'https://www.googleapis.com/auth/devstorage.read_write', key) @client. = service_account. @gcs = @client.discovered_api('storage', 'v1') unless @backup_to_dir.nil? Dir.mkdir(@backup_to_dir, 0700) unless File.exists?(@backup_to_dir) end FileUtils.mkdir_p(@temporary_directory) unless Dir.exist?(@temporary_directory) end |
#run(queue) ⇒ Object
def register
94 95 96 97 98 99 |
# File 'lib/logstash/inputs/google_cloud_storage.rb', line 94 def run(queue) @current_thread = Thread.current Stud.interval(@interval) do process_files(queue) end end |
#stop ⇒ Object
181 182 183 184 185 186 |
# File 'lib/logstash/inputs/google_cloud_storage.rb', line 181 def stop # @current_thread is initialized in the `#run` method, # this variable is needed because the `#stop` is a called in another thread # than the `#run` method and requiring us to call stop! with a explicit thread. Stud.stop!(@current_thread) end |