Class: LogStash::Outputs::S3

Inherits:
Base
  • Object
show all
Includes:
PluginMixins::AwsConfig
Defined in:
lib/logstash/outputs/s3.rb

Overview

INFORMATION:

This plugin was created for store the logstash’s events into Amazon Simple Storage Service (Amazon S3). For use it you needs authentications and an s3 bucket. Be careful to have the permission to write file on S3’s bucket and run logstash with super user for establish connection.

S3 plugin allows you to do something complex, let’s explain:)

S3 outputs create temporary files into “/opt/logstash/S3_temp/”. If you want, you can change the path at the start of register method. This files have a special name, for example:

ls.s3.ip-10-228-27-95.2013-04-18T10.00.tag_hello.part0.txt

ls.s3 : indicate logstash plugin s3

“ip-10-228-27-95” : indicate you ip machine, if you have more logstash and writing on the same bucket for example. “2013-04-18T10.00” : represents the time whenever you specify time_file. “tag_hello” : this indicate the event’s tag, you can collect events with the same tag. “part0” : this means if you indicate size_file then it will generate more parts if you file.size > size_file.

When a file is full it will pushed on bucket and will be deleted in temporary directory.
If a file is empty is not pushed, but deleted.

This plugin have a system to restore the previous temporary files if something crash.

Note

:

If you specify size_file and time_file then it will create file for each tag (if specified), when time_file or their size > size_file, it will be triggered then they will be pushed on s3’s bucket and will delete from local disk. If you don’t specify size_file, but time_file then it will create only one file for each tag (if specified). When time_file it will be triggered then the files will be pushed on s3’s bucket and delete from local disk.

If you don’t specify time_file, but size_file then it will create files for each tag (if specified), that will be triggered when their size > size_file, then they will be pushed on s3’s bucket and will delete from local disk.

If you don’t specific size_file and time_file you have a curios mode. It will create only one file for each tag (if specified). Then the file will be rest on temporary directory and don’t will be pushed on bucket until we will restart logstash.

#### Usage: This is an example of logstash config:

source,ruby

output {

s3{
  access_key_id => "crazy_key"             (required)
  secret_access_key => "monkey_access_key" (required)
  endpoint_region => "eu-west-1"           (required)
  bucket => "boss_please_open_your_bucket" (required)
  size_file => 2048                        (optional)
  time_file => 5                           (optional)
  format => "plain"                        (optional)
  canned_acl => "private"                  (optional. Options are "private", "public_read", "public_read_write", "authenticated_read". Defaults to "private" )
}

Constant Summary collapse

TEMPFILE_EXTENSION =
"log.gz"
S3_INVALID_CHARACTERS =
/[\^`><]/

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#page_counterObject (readonly)

Returns the value of attribute page_counter.



115
116
117
# File 'lib/logstash/outputs/s3.rb', line 115

def page_counter
  @page_counter
end

#s3Object (readonly)

Returns the value of attribute s3.



116
117
118
# File 'lib/logstash/outputs/s3.rb', line 116

def s3
  @s3
end

#tempfileObject

Exposed attributes for testing purpose.



114
115
116
# File 'lib/logstash/outputs/s3.rb', line 114

def tempfile
  @tempfile
end

Instance Method Details

#aws_s3_configObject



118
119
120
121
# File 'lib/logstash/outputs/s3.rb', line 118

def aws_s3_config
  @logger.info("Registering s3 output", :bucket => @bucket, :endpoint_region => @region)
  @s3 = AWS::S3.new(aws_options_hash)
end

#aws_service_endpoint(region) ⇒ Object



123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/logstash/outputs/s3.rb', line 123

def aws_service_endpoint(region)
  # Make the deprecated endpoint_region work
  # TODO: (ph) Remove this after deprecation.

  if @endpoint_region
    region_to_use = @endpoint_region
  else
    region_to_use = @region
  end

  return {
    :s3_endpoint => region_to_use == 'us-east-1' ? 's3.amazonaws.com' : "s3-#{region_to_use}.amazonaws.com"
  }
end

#get_temporary_filename(directory, file, page_counter = 0) ⇒ Object



245
246
247
248
249
250
# File 'lib/logstash/outputs/s3.rb', line 245

def get_temporary_filename(directory, file, page_counter = 0)
# Just to make sure we don't over-write files from a 'concurrent' logstash instance
# this includes a node that was replaced and gets it's part number reset
rand_string = (0...8).map { (65 + rand(26)).chr }.join
  return "#{@temporary_directory}/#{directory}/#{file}.part-#{page_counter}.#{rand_string}.#{TEMPFILE_EXTENSION}"
end

#periodic_intervalObject



240
241
242
# File 'lib/logstash/outputs/s3.rb', line 240

def periodic_interval
  @time_file * 60
end

#receive(event) ⇒ Object



253
254
255
256
# File 'lib/logstash/outputs/s3.rb', line 253

def receive(event)
  return unless output?(event)
  @codec.encode(event)
end

#registerObject



168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/logstash/outputs/s3.rb', line 168

def register
  require "aws-sdk"
  # required if using ruby version < 2.0
  # http://ruby.awsblog.com/post/Tx16QY1CI5GVBFT/Threading-with-the-AWS-SDK-for-Ruby
  AWS.eager_autoload!(AWS::S3)

  workers_not_supported

  @s3 = aws_s3_config
  @upload_queue = Queue.new
  @file_rotation_lock = Mutex.new

  if @prefix && @prefix =~ S3_INVALID_CHARACTERS
    @logger.error("S3: prefix contains invalid characters", :prefix => @prefix, :contains => S3_INVALID_CHARACTERS)
    raise LogStash::ConfigurationError, "S3: prefix contains invalid characters"
  end

  if !Dir.exist?(@temporary_directory)
    FileUtils.mkdir_p(@temporary_directory)
  end

@segregations = {}

  test_s3_write

  restore_from_crashes if @restore == true
register_segregation("test/test")
  configure_periodic_rotation if time_file != 0
@segregations.delete("test/test")
#  configure_upload_workers

  @codec.on_event do |event, encoded_event|
    handle_event(event, encoded_event)
  end
end

#restoreObject

IMPORTANT: if you use multiple instance of s3, you should specify on one of them the “restore=> true” and on the others “restore => false”. This is hack for not destroy the new files after restoring the initial files. If you do not specify “restore => true” when logstash crashes or is restarted, the files are not sent into the bucket, for example if you have single Instance.



97
# File 'lib/logstash/outputs/s3.rb', line 97

config :restore, :validate => :boolean, :default => false

#restore_from_crashesObject



229
230
231
232
233
234
235
236
237
# File 'lib/logstash/outputs/s3.rb', line 229

def restore_from_crashes
  @logger.debug("S3: is attempting to verify previous crashes...")

  Dir[File.join(@temporary_directory, "*.#{TEMPFILE_EXTENSION}")].each do |file|
    name_file = File.basename(file)
    @logger.warn("S3: have found temporary file the upload process crashed, uploading file to S3.", :filename => name_file)
    move_file_to_bucket_async(file)
  end
end

#rotate_events_log?(segregation) ⇒ Boolean

Returns:

  • (Boolean)


259
260
261
262
263
# File 'lib/logstash/outputs/s3.rb', line 259

def rotate_events_log? segregation
  @file_rotation_lock.synchronize do
    @segregations[segregation][:file].size > @size_file
  end
end

#teardownObject



271
272
273
274
275
276
277
278
279
# File 'lib/logstash/outputs/s3.rb', line 271

def teardown
  shutdown_upload_workers
  @periodic_rotation_thread.stop! if @periodic_rotation_thread

  @file_rotation_lock.synchronize do
    @tempfile.close unless @tempfile.nil? && @tempfile.closed?
  end
  finished
end

#test_s3_writeObject



208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
# File 'lib/logstash/outputs/s3.rb', line 208

def test_s3_write
  @logger.debug("S3: Creating a test file on S3")

  test_filename = File.join(
	@temporary_directory,
	"logstash-programmatic-access-test-object-#{Time.now.to_i}"
)

  File.open(test_filename, 'a') do |file|
    file.write('test')
  end

  begin
    write_on_bucket(test_filename)
    delete_on_bucket(test_filename)
  ensure
    File.delete(test_filename)
  end
end

#write_events_to_multiple_files?Boolean

Returns:

  • (Boolean)


266
267
268
# File 'lib/logstash/outputs/s3.rb', line 266

def write_events_to_multiple_files?
  @size_file > 0
end

#write_on_bucket(file) ⇒ Object



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/logstash/outputs/s3.rb', line 139

def write_on_bucket(file)
  # find and use the bucket
  bucket = @s3.buckets[@bucket]

  remote_filename = file.gsub(@temporary_directory, "").sub!(/^\//, '')

split = remote_filename.split(".")

split.pop

split << ""

@logger.info("write_on_bucket: #{remote_filename}")

  File.open(file, 'r') do |fileIO|
    begin
      # prepare for write the file
      object = bucket.objects[remote_filename]
      object.write(fileIO, :acl => @canned_acl, :content_encoding => "gzip")
    rescue AWS::Errors::Base => error
      @logger.error("S3: AWS error", :error => error)
      raise LogStash::Error, "AWS Configuration Error, #{error}"
    end
  end

  @logger.debug("S3: has written remote file in bucket with canned ACL", :remote_filename => remote_filename, :bucket  => @bucket, :canned_acl => @canned_acl)
end