Class: LogStash::Inputs::S3

Inherits:
Base
  • Object
show all
Includes:
PluginMixins::AwsConfig::V2
Defined in:
lib/logstash/inputs/s3.rb

Overview

Stream events from files from a S3 bucket.

Each line from each file generates an event. Files ending in ‘.gz` are handled as gzip’ed files.

Defined Under Namespace

Modules: SinceDB

Instance Method Summary collapse

Instance Method Details

#backup_to_bucket(object) ⇒ Object



113
114
115
116
117
118
119
120
121
# File 'lib/logstash/inputs/s3.rb', line 113

def backup_to_bucket(object)
  unless @backup_to_bucket.nil?
    backup_key = "#{@backup_add_prefix}#{object.key}"
    @backup_bucket.object(backup_key).copy_from(:copy_source => "#{object.bucket_name}/#{object.key}")
    if @delete
      object.delete()
    end
  end
end

#backup_to_dir(filename) ⇒ Object



124
125
126
127
128
# File 'lib/logstash/inputs/s3.rb', line 124

def backup_to_dir(filename)
  unless @backup_to_dir.nil?
    FileUtils.cp(filename, @backup_to_dir)
  end
end

#list_new_filesObject



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/logstash/inputs/s3.rb', line 95

def list_new_files
  objects = {}

  @s3bucket.objects(:prefix => @prefix).each do |log|
    @logger.debug("S3 input: Found key", :key => log.key)

    unless ignore_filename?(log.key)
      if sincedb.newer?(log.last_modified)
        objects[log.key] = log.last_modified
        @logger.debug("S3 input: Adding to objects[]", :key => log.key)
        @logger.debug("objects[] length is: ", :length => objects.length)
      end
    end
  end
  return objects.keys.sort {|a,b| objects[a] <=> objects[b]}
end

#process_files(queue) ⇒ Object



131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/logstash/inputs/s3.rb', line 131

def process_files(queue)
  objects = list_new_files

  objects.each do |key|
    if stop?
      break
    else
      @logger.debug("S3 input processing", :bucket => @bucket, :key => key)
      process_log(queue, key)
    end
  end
end

#registerObject



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/logstash/inputs/s3.rb', line 59

def register
  require "fileutils"
  require "digest/md5"
  require "aws-sdk-resources"

  @logger.info("Registering s3 input", :bucket => @bucket, :region => @region)

  s3 = get_s3object

  @s3bucket = s3.bucket(@bucket)

  unless @backup_to_bucket.nil?
    @backup_bucket = s3.bucket(@backup_to_bucket)
    begin
      s3.client.head_bucket({ :bucket => @backup_to_bucket})
    rescue Aws::S3::Errors::NoSuchBucket
      s3.create_bucket({ :bucket => @backup_to_bucket})
    end
  end

  unless @backup_to_dir.nil?
    Dir.mkdir(@backup_to_dir, 0700) unless File.exists?(@backup_to_dir)
  end

  FileUtils.mkdir_p(@temporary_directory) unless Dir.exist?(@temporary_directory)
end

#run(queue) ⇒ Object



87
88
89
90
91
92
# File 'lib/logstash/inputs/s3.rb', line 87

def run(queue)
  @current_thread = Thread.current
  Stud.interval(@interval) do
    process_files(queue)
  end
end

#stopObject



145
146
147
148
149
150
# File 'lib/logstash/inputs/s3.rb', line 145

def stop
  # @current_thread is initialized in the `#run` method,
  # this variable is needed because the `#stop` is a called in another thread
  # than the `#run` method and requiring us to call stop! with a explicit thread.
  Stud.stop!(@current_thread)
end