Class: LogStash::Outputs::S3

Inherits:
Base
  • Object
show all
Includes:
PluginMixins::AwsConfig
Defined in:
lib/logstash/outputs/s3.rb

Overview

INFORMATION:

This plugin sends logstash events to Amazon Simple Storage Service (Amazon S3). To use it you need to have the proper write premissions and a valid s3 bucket. Make sure you have permissions to write files on S3’s bucket. Also be sure to run logstash as super user to establish a connection.

This plugin outputs temporary files to “/opt/logstash/S3_temp/”. If you want, you can change the path at the start of register method. These files have a special name, for example:

ls.s3.ip-10-228-27-95.2013-04-18T10.00.tag_hello.part0.txt

ls.s3 : indicate logstash plugin s3

“ip-10-228-27-95” : indicates the ip of your machine. “2013-04-18T10.00” : represents the time whenever you specify time_file. “tag_hello” : this indicates the event’s tag. “part0” : this means if you indicate size_file then it will generate more parts if you file.size > size_file.

When a file is full it will be pushed to a bucket and will be deleted from the temporary directory.
If a file is empty is not pushed, it is not deleted.

This plugin have a system to restore the previous temporary files if something crash.

Note

:

If you specify size_file and time_file then it will create file for each tag (if specified), when time_file or their size > size_file, it will be triggered then they will be pushed on s3’s bucket and will delete from local disk. If you don’t specify size_file, but time_file then it will create only one file for each tag (if specified). When time_file it will be triggered then the files will be pushed on s3’s bucket and delete from local disk.

If you don’t specify time_file, but size_file then it will create files for each tag (if specified), that will be triggered when their size > size_file, then they will be pushed on s3’s bucket and will delete from local disk.

If you don’t specific size_file and time_file you have a curios mode. It will create only one file for each tag (if specified). Then the file will be rest on temporary directory and don’t will be pushed on bucket until we will restart logstash.

#### Usage: This is an example of logstash config:

source,ruby

output {

s3{
  access_key_id => "crazy_key"             (required)
  secret_access_key => "monkey_access_key" (required)
  endpoint_region => "eu-west-1"           (required)
  bucket => "boss_please_open_your_bucket" (required)
  size_file => 2048                        (optional)
  time_file => 5                           (optional)
  format => "plain"                        (optional)
  canned_acl => "private"                  (optional. Options are "private", "public_read", "public_read_write", "authenticated_read". Defaults to "private" )
}

Constant Summary collapse

TEMPFILE_EXTENSION =
"txt"
S3_INVALID_CHARACTERS =
/[\^`><]/

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#page_counterObject (readonly)

Returns the value of attribute page_counter.



115
116
117
# File 'lib/logstash/outputs/s3.rb', line 115

def page_counter
  @page_counter
end

#s3Object (readonly)

Returns the value of attribute s3.



116
117
118
# File 'lib/logstash/outputs/s3.rb', line 116

def s3
  @s3
end

#tempfileObject

Exposed attributes for testing purpose.



114
115
116
# File 'lib/logstash/outputs/s3.rb', line 114

def tempfile
  @tempfile
end

Instance Method Details

#aws_s3_configObject



118
119
120
121
# File 'lib/logstash/outputs/s3.rb', line 118

def aws_s3_config
  @logger.info("Registering s3 output", :bucket => @bucket, :endpoint_region => @region)
  @s3 = AWS::S3.new(aws_options_hash)
end

#aws_service_endpoint(region) ⇒ Object



123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/logstash/outputs/s3.rb', line 123

def aws_service_endpoint(region)
  # Make the deprecated endpoint_region work
  # TODO: (ph) Remove this after deprecation.
  
  if @endpoint_region
    region_to_use = @endpoint_region
  else
    region_to_use = @region
  end

  return {
    :s3_endpoint => region_to_use == 'us-east-1' ? 's3.amazonaws.com' : "s3-#{region_to_use}.amazonaws.com"
  }
end

#closeObject



312
313
314
315
316
317
318
319
# File 'lib/logstash/outputs/s3.rb', line 312

def close
  shutdown_upload_workers
  @periodic_rotation_thread.stop! if @periodic_rotation_thread

  @file_rotation_lock.synchronize do
    @tempfile.close unless @tempfile.nil? && @tempfile.closed?
  end
end

#create_temporary_fileObject



163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/logstash/outputs/s3.rb', line 163

def create_temporary_file
  filename = File.join(@temporary_directory, get_temporary_filename(@page_counter))

  @logger.debug("S3: Creating a new temporary file", :filename => filename)

  @file_rotation_lock.synchronize do
    unless @tempfile.nil?
      @tempfile.close
    end

    @tempfile = File.open(filename, "a")
  end
end

#get_temporary_filename(page_counter = 0) ⇒ Object



268
269
270
271
272
273
274
275
276
277
# File 'lib/logstash/outputs/s3.rb', line 268

def get_temporary_filename(page_counter = 0)
  current_time = Time.now
  filename = "ls.s3.#{Socket.gethostname}.#{current_time.strftime("%Y-%m-%dT%H.%M")}"

  if @tags.size > 0
    return "#{filename}.tag_#{@tags.join('.')}.part#{page_counter}.#{TEMPFILE_EXTENSION}"
  else
    return "#{filename}.part#{page_counter}.#{TEMPFILE_EXTENSION}"
  end
end

#move_file_to_bucket(file) ⇒ Object



246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
# File 'lib/logstash/outputs/s3.rb', line 246

def move_file_to_bucket(file)
  if !File.zero?(file)
    write_on_bucket(file)
    @logger.debug("S3: file was put on the upload thread", :filename => File.basename(file), :bucket => @bucket)
  end

  begin
    File.delete(file)
  rescue Errno::ENOENT
    # Something else deleted the file, logging but not raising the issue
    @logger.warn("S3: Cannot delete the temporary file since it doesn't exist on disk", :filename => File.basename(file))
  rescue Errno::EACCES
    @logger.error("S3: Logstash doesnt have the permission to delete the file in the temporary directory.", :filename => File.basename(file), :temporary_directory => @temporary_directory)
  end
end

#periodic_intervalObject



263
264
265
# File 'lib/logstash/outputs/s3.rb', line 263

def periodic_interval
  @time_file * 60
end

#receive(event) ⇒ Object



280
281
282
283
# File 'lib/logstash/outputs/s3.rb', line 280

def receive(event)
  
  @codec.encode(event)
end

#registerObject



178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/logstash/outputs/s3.rb', line 178

def register
  require "aws-sdk"
  # required if using ruby version < 2.0
  # http://ruby.awsblog.com/post/Tx16QY1CI5GVBFT/Threading-with-the-AWS-SDK-for-Ruby
  AWS.eager_autoload!(AWS::S3)

  workers_not_supported

  @s3 = aws_s3_config
  @upload_queue = Queue.new
  @file_rotation_lock = Mutex.new

  if @prefix && @prefix =~ S3_INVALID_CHARACTERS
    @logger.error("S3: prefix contains invalid characters", :prefix => @prefix, :contains => S3_INVALID_CHARACTERS)
    raise LogStash::ConfigurationError, "S3: prefix contains invalid characters"
  end

  if !Dir.exist?(@temporary_directory)
    FileUtils.mkdir_p(@temporary_directory)
  end

  test_s3_write

  restore_from_crashes if @restore == true
  reset_page_counter
  create_temporary_file
  configure_periodic_rotation if time_file != 0
  configure_upload_workers

  @codec.on_event do |event, encoded_event|
    handle_event(encoded_event)
  end
end

#restoreObject

IMPORTANT: if you use multiple instance of s3, you should specify on one of them the “restore=> true” and on the others “restore => false”. This is hack for not destroy the new files after restoring the initial files. If you do not specify “restore => true” when logstash crashes or is restarted, the files are not sent into the bucket, for example if you have single Instance.



97
# File 'lib/logstash/outputs/s3.rb', line 97

config :restore, :validate => :boolean, :default => false

#restore_from_crashesObject



235
236
237
238
239
240
241
242
243
# File 'lib/logstash/outputs/s3.rb', line 235

def restore_from_crashes
  @logger.debug("S3: is attempting to verify previous crashes...")

  Dir[File.join(@temporary_directory, "*.#{TEMPFILE_EXTENSION}")].each do |file|
    name_file = File.basename(file)
    @logger.warn("S3: have found temporary file the upload process crashed, uploading file to S3.", :filename => name_file)
    move_file_to_bucket_async(file)
  end
end

#rotate_events_log?Boolean

Returns:

  • (Boolean)


286
287
288
289
290
# File 'lib/logstash/outputs/s3.rb', line 286

def rotate_events_log?
  @file_rotation_lock.synchronize do
    @tempfile.size > @size_file
  end
end

#test_s3_writeObject



216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/logstash/outputs/s3.rb', line 216

def test_s3_write
  @logger.debug("S3: Creating a test file on S3")

  test_filename = File.join(@temporary_directory,
                            "logstash-programmatic-access-test-object-#{Time.now.to_i}")

  File.open(test_filename, 'a') do |file|
    file.write('test')
  end

  begin
    write_on_bucket(test_filename)
    delete_on_bucket(test_filename)
  ensure
    File.delete(test_filename)
  end
end

#write_events_to_multiple_files?Boolean

Returns:

  • (Boolean)


293
294
295
# File 'lib/logstash/outputs/s3.rb', line 293

def write_events_to_multiple_files?
  @size_file > 0
end

#write_on_bucket(file) ⇒ Object



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/logstash/outputs/s3.rb', line 139

def write_on_bucket(file)
  # find and use the bucket
  bucket = @s3.buckets[@bucket]

  remote_filename = "#{@prefix}#{File.basename(file)}"

  @logger.debug("S3: ready to write file in bucket", :remote_filename => remote_filename, :bucket => @bucket)

  File.open(file, 'r') do |fileIO|
    begin
      # prepare for write the file
      object = bucket.objects[remote_filename]
      object.write(fileIO, :acl => @canned_acl)
    rescue AWS::Errors::Base => error
      @logger.error("S3: AWS error", :error => error)
      raise LogStash::Error, "AWS Configuration Error, #{error}"
    end
  end

  @logger.debug("S3: has written remote file in bucket with canned ACL", :remote_filename => remote_filename, :bucket  => @bucket, :canned_acl => @canned_acl)
end

#write_to_tempfile(event) ⇒ Object



298
299
300
301
302
303
304
305
306
307
308
309
# File 'lib/logstash/outputs/s3.rb', line 298

def write_to_tempfile(event)
  begin
    @logger.debug("S3: put event into tempfile ", :tempfile => File.basename(@tempfile))

    @file_rotation_lock.synchronize do
      @tempfile.syswrite(event)
    end
  rescue Errno::ENOSPC
    @logger.error("S3: No space left in temporary directory", :temporary_directory => @temporary_directory)
    close
  end
end