Class: LogStash::Inputs::S3
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::S3
- Includes:
- PluginMixins::AwsConfig::V2
- Defined in:
- lib/logstash/inputs/s3.rb
Overview
Stream events from files from a S3 bucket.
Each line from each file generates an event. Files ending in ‘.gz` are handled as gzip’ed files.
Defined Under Namespace
Modules: SinceDB
Instance Method Summary collapse
- #backup_to_bucket(object) ⇒ Object
- #backup_to_dir(filename) ⇒ Object
- #list_new_files ⇒ Object
- #process_files(queue) ⇒ Object
- #register ⇒ Object
- #run(queue) ⇒ Object
- #stop ⇒ Object
Instance Method Details
#backup_to_bucket(object) ⇒ Object
113 114 115 116 117 118 119 120 121 |
# File 'lib/logstash/inputs/s3.rb', line 113 def backup_to_bucket(object) unless @backup_to_bucket.nil? backup_key = "#{@backup_add_prefix}#{object.key}" @backup_bucket.object(backup_key).copy_from(:copy_source => "#{object.bucket_name}/#{object.key}") if @delete object.delete() end end end |
#backup_to_dir(filename) ⇒ Object
124 125 126 127 128 |
# File 'lib/logstash/inputs/s3.rb', line 124 def backup_to_dir(filename) unless @backup_to_dir.nil? FileUtils.cp(filename, @backup_to_dir) end end |
#list_new_files ⇒ Object
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/logstash/inputs/s3.rb', line 95 def list_new_files objects = {} @s3bucket.objects(:prefix => @prefix).each do |log| @logger.debug("S3 input: Found key", :key => log.key) unless ignore_filename?(log.key) if sincedb.newer?(log.last_modified) objects[log.key] = log.last_modified @logger.debug("S3 input: Adding to objects[]", :key => log.key) @logger.debug("objects[] length is: ", :length => objects.length) end end end return objects.keys.sort {|a,b| objects[a] <=> objects[b]} end |
#process_files(queue) ⇒ Object
131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/logstash/inputs/s3.rb', line 131 def process_files(queue) objects = list_new_files objects.each do |key| if stop? break else @logger.debug("S3 input processing", :bucket => @bucket, :key => key) process_log(queue, key) end end end |
#register ⇒ Object
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/logstash/inputs/s3.rb', line 59 def register require "fileutils" require "digest/md5" require "aws-sdk-resources" @logger.info("Registering s3 input", :bucket => @bucket, :region => @region) s3 = get_s3object @s3bucket = s3.bucket(@bucket) unless @backup_to_bucket.nil? @backup_bucket = s3.bucket(@backup_to_bucket) begin s3.client.head_bucket({ :bucket => @backup_to_bucket}) rescue Aws::S3::Errors::NoSuchBucket s3.create_bucket({ :bucket => @backup_to_bucket}) end end unless @backup_to_dir.nil? Dir.mkdir(@backup_to_dir, 0700) unless File.exists?(@backup_to_dir) end FileUtils.mkdir_p(@temporary_directory) unless Dir.exist?(@temporary_directory) end |
#run(queue) ⇒ Object
87 88 89 90 91 92 |
# File 'lib/logstash/inputs/s3.rb', line 87 def run(queue) @current_thread = Thread.current Stud.interval(@interval) do process_files(queue) end end |
#stop ⇒ Object
145 146 147 148 149 150 |
# File 'lib/logstash/inputs/s3.rb', line 145 def stop # @current_thread is initialized in the `#run` method, # this variable is needed because the `#stop` is a called in another thread # than the `#run` method and requiring us to call stop! with a explicit thread. Stud.stop!(@current_thread) end |