Class: LogStash::Outputs::S3::Uploader

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/outputs/s3/uploader.rb

Constant Summary collapse

TIME_BEFORE_RETRYING_SECONDS =
1
DEFAULT_THREADPOOL =
Concurrent::ThreadPoolExecutor.new({
  :min_threads => 1,
  :max_threads => 8,
  :max_queue => 1,
  :fallback_policy => :caller_runs
})

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(bucket, logger, threadpool = DEFAULT_THREADPOOL) ⇒ Uploader



20
21
22
23
24
# File 'lib/logstash/outputs/s3/uploader.rb', line 20

def initialize(bucket, logger, threadpool = DEFAULT_THREADPOOL)
  @bucket = bucket
  @workers_pool = threadpool
  @logger = logger
end

Instance Attribute Details

#bucketObject (readonly)

Returns the value of attribute bucket.



18
19
20
# File 'lib/logstash/outputs/s3/uploader.rb', line 18

def bucket
  @bucket
end

#loggerObject (readonly)

Returns the value of attribute logger.



18
19
20
# File 'lib/logstash/outputs/s3/uploader.rb', line 18

def logger
  @logger
end

#upload_optionsObject (readonly)

Returns the value of attribute upload_options.



18
19
20
# File 'lib/logstash/outputs/s3/uploader.rb', line 18

def upload_options
  @upload_options
end

Instance Method Details

#stopObject



58
59
60
61
# File 'lib/logstash/outputs/s3/uploader.rb', line 58

def stop
  @workers_pool.shutdown
  @workers_pool.wait_for_termination(nil) # block until its done
end

#upload(file, options = {}) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/logstash/outputs/s3/uploader.rb', line 33

def upload(file, options = {})
  upload_options = options.fetch(:upload_options, {})

  begin
    obj = bucket.object(file.key)
    obj.upload_file(file.path, upload_options)
  rescue Errno::ENOENT => e
    logger.error("File doesn't exist! Unrecoverable error.", :exception => e.class, :message => e.message, :path => file.path, :backtrace => e.backtrace)
  rescue => e
    # When we get here it usually mean that S3 tried to do some retry by himself (default is 3)
    # When the retry limit is reached or another error happen we will wait and retry.
    #
    # Thread might be stuck here, but I think its better than losing anything
    # its either a transient errors or something bad really happened.
    logger.error("Uploading failed, retrying.", :exception => e.class, :message => e.message, :path => file.path, :backtrace => e.backtrace)
    sleep TIME_BEFORE_RETRYING_SECONDS
    retry
  end

  options[:on_complete].call(file) unless options[:on_complete].nil?
rescue => e
  logger.error("An error occured in the `on_complete` uploader", :exception => e.class, :message => e.message, :path => file.path, :backtrace => e.backtrace)
  raise e # reraise it since we don't deal with it now
end

#upload_async(file, options = {}) ⇒ Object



26
27
28
29
30
31
# File 'lib/logstash/outputs/s3/uploader.rb', line 26

def upload_async(file, options = {})
  @workers_pool.post do
    LogStash::Util.set_thread_name("S3 output uploader, file: #{file.path}")
    upload(file, options)
  end
end