Class: LogStash::Outputs::S3

Inherits:
Base
  • Object
show all
Includes:
PluginMixins::AwsConfig
Defined in:
lib/logstash/outputs/s3.rb

Overview

INFORMATION:

This plugin sends logstash events to Amazon Simple Storage Service (Amazon S3). To use it you need to have the proper write premissions and a valid s3 bucket. Make sure you have permissions to write files on S3’s bucket. Also be sure to run logstash as super user to establish a connection.

This plugin outputs temporary files to “/opt/logstash/S3_temp/”. If you want, you can change the path at the start of register method. These files have a special name, for example:

ls.s3.ip-10-228-27-95.2013-04-18T10.00.tag_hello.part0.txt

ls.s3 : indicate logstash plugin s3

“ip-10-228-27-95” : indicates the ip of your machine. “2013-04-18T10.00” : represents the time whenever you specify time_file. “tag_hello” : this indicates the event’s tag. “part0” : this means if you indicate size_file then it will generate more parts if you file.size > size_file.

When a file is full it will be pushed to a bucket and will be deleted from the temporary directory.
If a file is empty is not pushed, it is not deleted.

This plugin have a system to restore the previous temporary files if something crash.

Note

:

If you specify size_file and time_file then it will create file for each tag (if specified), when time_file or their size > size_file, it will be triggered then they will be pushed on s3’s bucket and will delete from local disk. If you don’t specify size_file, but time_file then it will create only one file for each tag (if specified). When time_file it will be triggered then the files will be pushed on s3’s bucket and delete from local disk.

If you don’t specify time_file, but size_file then it will create files for each tag (if specified), that will be triggered when their size > size_file, then they will be pushed on s3’s bucket and will delete from local disk.

If you don’t specific size_file and time_file you have a curios mode. It will create only one file for each tag (if specified). Then the file will be rest on temporary directory and don’t will be pushed on bucket until we will restart logstash.

#### Usage: This is an example of logstash config:

source,ruby

output {

s3{
  access_key_id => "crazy_key"             (required)
  secret_access_key => "monkey_access_key" (required)
  endpoint_region => "eu-west-1"           (required)
  bucket => "boss_please_open_your_bucket" (required)
  size_file => 2048                        (optional)
  time_file => 5                           (optional)
  format => "plain"                        (optional)
  canned_acl => "private"                  (optional. Options are "private", "public_read", "public_read_write", "authenticated_read". Defaults to "private" )
}

Constant Summary collapse

TEMPFILE_EXTENSION =
"txt"
S3_INVALID_CHARACTERS =
/[\^`><]/

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#page_counterObject (readonly)

Returns the value of attribute page_counter.



125
126
127
# File 'lib/logstash/outputs/s3.rb', line 125

def page_counter
  @page_counter
end

#s3Object (readonly)

Returns the value of attribute s3.



126
127
128
# File 'lib/logstash/outputs/s3.rb', line 126

def s3
  @s3
end

#tempfileObject

Exposed attributes for testing purpose.



124
125
126
# File 'lib/logstash/outputs/s3.rb', line 124

def tempfile
  @tempfile
end

Instance Method Details

#aws_s3_configObject



128
129
130
131
# File 'lib/logstash/outputs/s3.rb', line 128

def aws_s3_config
  @logger.info("Registering s3 output", :bucket => @bucket, :endpoint_region => @region)
  @s3 = AWS::S3.new(aws_options_hash)
end

#aws_service_endpoint(region) ⇒ Object



133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/logstash/outputs/s3.rb', line 133

def aws_service_endpoint(region)
  # Make the deprecated endpoint_region work
  # TODO: (ph) Remove this after deprecation.
  
  if @endpoint_region
    region_to_use = @endpoint_region
  else
    region_to_use = @region
  end

  return {
    :s3_endpoint => region_to_use == 'us-east-1' ? 's3.amazonaws.com' : "s3-#{region_to_use}.amazonaws.com"
  }
end

#closeObject



322
323
324
325
326
327
328
329
# File 'lib/logstash/outputs/s3.rb', line 322

def close
  shutdown_upload_workers
  @periodic_rotation_thread.stop! if @periodic_rotation_thread

  @file_rotation_lock.synchronize do
    @tempfile.close unless @tempfile.nil? && @tempfile.closed?
  end
end

#create_temporary_fileObject



173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/logstash/outputs/s3.rb', line 173

def create_temporary_file
  filename = File.join(@temporary_directory, get_temporary_filename(@page_counter))

  @logger.debug("S3: Creating a new temporary file", :filename => filename)

  @file_rotation_lock.synchronize do
    unless @tempfile.nil?
      @tempfile.close
    end

    @tempfile = File.open(filename, "a")
  end
end

#get_temporary_filename(page_counter = 0) ⇒ Object



278
279
280
281
282
283
284
285
286
287
# File 'lib/logstash/outputs/s3.rb', line 278

def get_temporary_filename(page_counter = 0)
  current_time = Time.now
  filename = "ls.s3.#{Socket.gethostname}.#{current_time.strftime("%Y-%m-%dT%H.%M")}"

  if @tags.size > 0
    return "#{filename}.tag_#{@tags.join('.')}.part#{page_counter}.#{TEMPFILE_EXTENSION}"
  else
    return "#{filename}.part#{page_counter}.#{TEMPFILE_EXTENSION}"
  end
end

#move_file_to_bucket(file) ⇒ Object



256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
# File 'lib/logstash/outputs/s3.rb', line 256

def move_file_to_bucket(file)
  if !File.zero?(file)
    write_on_bucket(file)
    @logger.debug("S3: file was put on the upload thread", :filename => File.basename(file), :bucket => @bucket)
  end

  begin
    File.delete(file)
  rescue Errno::ENOENT
    # Something else deleted the file, logging but not raising the issue
    @logger.warn("S3: Cannot delete the temporary file since it doesn't exist on disk", :filename => File.basename(file))
  rescue Errno::EACCES
    @logger.error("S3: Logstash doesnt have the permission to delete the file in the temporary directory.", :filename => File.basename(file), :temporary_directory => @temporary_directory)
  end
end

#periodic_intervalObject



273
274
275
# File 'lib/logstash/outputs/s3.rb', line 273

def periodic_interval
  @time_file * 60
end

#receive(event) ⇒ Object



290
291
292
293
# File 'lib/logstash/outputs/s3.rb', line 290

def receive(event)
  
  @codec.encode(event)
end

#registerObject



188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
# File 'lib/logstash/outputs/s3.rb', line 188

def register
  require "aws-sdk"
  # required if using ruby version < 2.0
  # http://ruby.awsblog.com/post/Tx16QY1CI5GVBFT/Threading-with-the-AWS-SDK-for-Ruby
  AWS.eager_autoload!(AWS::S3)

  workers_not_supported

  @s3 = aws_s3_config
  @upload_queue = Queue.new
  @file_rotation_lock = Mutex.new

  if @prefix && @prefix =~ S3_INVALID_CHARACTERS
    @logger.error("S3: prefix contains invalid characters", :prefix => @prefix, :contains => S3_INVALID_CHARACTERS)
    raise LogStash::ConfigurationError, "S3: prefix contains invalid characters"
  end

  if !Dir.exist?(@temporary_directory)
    FileUtils.mkdir_p(@temporary_directory)
  end

  test_s3_write

  restore_from_crashes if @restore == true
  reset_page_counter
  create_temporary_file
  configure_periodic_rotation if time_file != 0
  configure_upload_workers

  @codec.on_event do |event, encoded_event|
    handle_event(encoded_event)
  end
end

#restoreObject

IMPORTANT: if you use multiple instance of s3, you should specify on one of them the “restore=> true” and on the others “restore => false”. This is hack for not destroy the new files after restoring the initial files. If you do not specify “restore => true” when logstash crashes or is restarted, the files are not sent into the bucket, for example if you have single Instance.



97
# File 'lib/logstash/outputs/s3.rb', line 97

config :restore, :validate => :boolean, :default => false

#restore_from_crashesObject



245
246
247
248
249
250
251
252
253
# File 'lib/logstash/outputs/s3.rb', line 245

def restore_from_crashes
  @logger.debug("S3: is attempting to verify previous crashes...")

  Dir[File.join(@temporary_directory, "*.#{TEMPFILE_EXTENSION}")].each do |file|
    name_file = File.basename(file)
    @logger.warn("S3: have found temporary file the upload process crashed, uploading file to S3.", :filename => name_file)
    move_file_to_bucket_async(file)
  end
end

#rotate_events_log?Boolean

Returns:

  • (Boolean)


296
297
298
299
300
# File 'lib/logstash/outputs/s3.rb', line 296

def rotate_events_log?
  @file_rotation_lock.synchronize do
    @tempfile.size > @size_file
  end
end

#test_s3_writeObject



226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
# File 'lib/logstash/outputs/s3.rb', line 226

def test_s3_write
  @logger.debug("S3: Creating a test file on S3")

  test_filename = File.join(@temporary_directory,
                            "logstash-programmatic-access-test-object-#{Time.now.to_i}")

  File.open(test_filename, 'a') do |file|
    file.write('test')
  end

  begin
    write_on_bucket(test_filename)
    delete_on_bucket(test_filename)
  ensure
    File.delete(test_filename)
  end
end

#write_events_to_multiple_files?Boolean

Returns:

  • (Boolean)


303
304
305
# File 'lib/logstash/outputs/s3.rb', line 303

def write_events_to_multiple_files?
  @size_file > 0
end

#write_on_bucket(file) ⇒ Object



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/logstash/outputs/s3.rb', line 149

def write_on_bucket(file)
  # find and use the bucket
  bucket = @s3.buckets[@bucket]

  remote_filename = "#{@prefix}#{File.basename(file)}"

  @logger.debug("S3: ready to write file in bucket", :remote_filename => remote_filename, :bucket => @bucket)

  File.open(file, 'r') do |fileIO|
    begin
      # prepare for write the file
      object = bucket.objects[remote_filename]
      object.write(fileIO, :acl => @canned_acl)
    rescue AWS::Errors::Base => error
      @logger.error("S3: AWS error", :error => error)
      raise LogStash::Error, "AWS Configuration Error, #{error}"
    end
  end

  @logger.debug("S3: has written remote file in bucket with canned ACL", :remote_filename => remote_filename, :bucket  => @bucket, :canned_acl => @canned_acl)
end

#write_to_tempfile(event) ⇒ Object



308
309
310
311
312
313
314
315
316
317
318
319
# File 'lib/logstash/outputs/s3.rb', line 308

def write_to_tempfile(event)
  begin
    @logger.debug("S3: put event into tempfile ", :tempfile => File.basename(@tempfile))

    @file_rotation_lock.synchronize do
      @tempfile.syswrite(event)
    end
  rescue Errno::ENOSPC
    @logger.error("S3: No space left in temporary directory", :temporary_directory => @temporary_directory)
    close
  end
end