Class: LogStash::Outputs::OSS::FileUploader

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/outputs/oss/file_uploader.rb

Constant Summary collapse

TIME_BEFORE_RETRY_SECONDS =
3

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(oss, bucket, additional_oss_settings, logger, thread_pool) ⇒ FileUploader

Returns a new instance of FileUploader.



14
15
16
17
18
19
20
# File 'lib/logstash/outputs/oss/file_uploader.rb', line 14

def initialize(oss, bucket, additional_oss_settings, logger, thread_pool)
  @oss = oss
  @bucket = bucket
  @additional_oss_settings = additional_oss_settings
  @logger = logger
  @thread_pool = thread_pool
end

Instance Attribute Details

#additional_oss_settingsObject (readonly)

Returns the value of attribute additional_oss_settings.



12
13
14
# File 'lib/logstash/outputs/oss/file_uploader.rb', line 12

def additional_oss_settings
  @additional_oss_settings
end

#bucketObject (readonly)

Returns the value of attribute bucket.



12
13
14
# File 'lib/logstash/outputs/oss/file_uploader.rb', line 12

def bucket
  @bucket
end

#loggerObject (readonly)

Returns the value of attribute logger.



12
13
14
# File 'lib/logstash/outputs/oss/file_uploader.rb', line 12

def logger
  @logger
end

#ossObject (readonly)

Returns the value of attribute oss.



12
13
14
# File 'lib/logstash/outputs/oss/file_uploader.rb', line 12

def oss
  @oss
end

Instance Method Details

#closeObject



62
63
64
65
# File 'lib/logstash/outputs/oss/file_uploader.rb', line 62

def close
  @thread_pool.shutdown
  @thread_pool.wait_for_termination(nil)
end

#upload(file, options = {}) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/logstash/outputs/oss/file_uploader.rb', line 29

def upload(file, options = {})
  meta = ObjectMetadata.new
  meta.setContentLength(file.size)
  unless @additional_oss_settings.nil?
    if @additional_oss_settings.include?(LogStash::Outputs::OSS::SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY)
      unless @additional_oss_settings[LogStash::Outputs::OSS::SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY].empty?
        meta.setServerSideEncryption(@additional_oss_settings[LogStash::Outputs::OSS::SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY])
      end
    end
  end

  stream = nil
  begin
    stream = FileInputStream.new(file.path)
    oss.putObject(@bucket, file.key, stream, meta)
  rescue Errno::ENOENT => e
    logger.error("Logstash OSS Output Plugin: file to be uploaded doesn't exist!", :exception => e.class, :message => e.message, :path => file.path, :backtrace => e.backtrace)
  rescue => e
    @logger.error("Logstash OSS Output Plugin: uploading failed, retrying.", :exception => e.class, :message => e.message, :path => file.path, :backtrace => e.backtrace)
    sleep TIME_BEFORE_RETRY_SECONDS
    retry
  ensure
    unless stream.nil?
      stream.close
    end
  end

  options[:on_complete].call(file) unless options[:on_complete].nil?
  rescue => e
    logger.error("Logstash OSS Output Plugin: an error occurred in the `on_complete` uploader", :exception => e.class, :message => e.message, :path => file.path, :backtrace => e.backtrace)
    raise e
end

#upload_async(file, options = {}) ⇒ Object



22
23
24
25
26
27
# File 'lib/logstash/outputs/oss/file_uploader.rb', line 22

def upload_async(file, options = {})
  @thread_pool.post do
    LogStash::Util.set_thread_name("Logstash OSS Output Plugin: output uploader, file: #{file.path}")
    upload(file, options)
  end
end