Class: LogStash::Outputs::OSS::FileUploader
- Inherits:
-
Object
- Object
- LogStash::Outputs::OSS::FileUploader
- Defined in:
- lib/logstash/outputs/oss/file_uploader.rb
Constant Summary collapse
- TIME_BEFORE_RETRY_SECONDS =
3
Instance Attribute Summary collapse
-
#additional_oss_settings ⇒ Object
readonly
Returns the value of attribute additional_oss_settings.
-
#bucket ⇒ Object
readonly
Returns the value of attribute bucket.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#oss ⇒ Object
readonly
Returns the value of attribute oss.
Instance Method Summary collapse
- #close ⇒ Object
-
#initialize(oss, bucket, additional_oss_settings, logger, thread_pool) ⇒ FileUploader
constructor
A new instance of FileUploader.
- #upload(file, options = {}) ⇒ Object
- #upload_async(file, options = {}) ⇒ Object
Constructor Details
#initialize(oss, bucket, additional_oss_settings, logger, thread_pool) ⇒ FileUploader
Returns a new instance of FileUploader.
14 15 16 17 18 19 20 |
# File 'lib/logstash/outputs/oss/file_uploader.rb', line 14 def initialize(oss, bucket, additional_oss_settings, logger, thread_pool) @oss = oss @bucket = bucket @additional_oss_settings = additional_oss_settings @logger = logger @thread_pool = thread_pool end |
Instance Attribute Details
#additional_oss_settings ⇒ Object (readonly)
Returns the value of attribute additional_oss_settings.
12 13 14 |
# File 'lib/logstash/outputs/oss/file_uploader.rb', line 12 def additional_oss_settings @additional_oss_settings end |
#bucket ⇒ Object (readonly)
Returns the value of attribute bucket.
12 13 14 |
# File 'lib/logstash/outputs/oss/file_uploader.rb', line 12 def bucket @bucket end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
12 13 14 |
# File 'lib/logstash/outputs/oss/file_uploader.rb', line 12 def logger @logger end |
#oss ⇒ Object (readonly)
Returns the value of attribute oss.
12 13 14 |
# File 'lib/logstash/outputs/oss/file_uploader.rb', line 12 def oss @oss end |
Instance Method Details
#close ⇒ Object
62 63 64 65 |
# File 'lib/logstash/outputs/oss/file_uploader.rb', line 62 def close @thread_pool.shutdown @thread_pool.wait_for_termination(nil) end |
#upload(file, options = {}) ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/logstash/outputs/oss/file_uploader.rb', line 29 def upload(file, = {}) = ObjectMetadata.new .setContentLength(file.size) unless @additional_oss_settings.nil? if @additional_oss_settings.include?(LogStash::Outputs::OSS::SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY) unless @additional_oss_settings[LogStash::Outputs::OSS::SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY].empty? .setServerSideEncryption(@additional_oss_settings[LogStash::Outputs::OSS::SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY]) end end end stream = nil begin stream = FileInputStream.new(file.path) oss.putObject(@bucket, file.key, stream, ) rescue Errno::ENOENT => e logger.error("Logstash OSS Output Plugin: file to be uploaded doesn't exist!", :exception => e.class, :message => e., :path => file.path, :backtrace => e.backtrace) rescue => e @logger.error("Logstash OSS Output Plugin: uploading failed, retrying.", :exception => e.class, :message => e., :path => file.path, :backtrace => e.backtrace) sleep TIME_BEFORE_RETRY_SECONDS retry ensure unless stream.nil? stream.close end end [:on_complete].call(file) unless [:on_complete].nil? rescue => e logger.error("Logstash OSS Output Plugin: an error occurred in the `on_complete` uploader", :exception => e.class, :message => e., :path => file.path, :backtrace => e.backtrace) raise e end |
#upload_async(file, options = {}) ⇒ Object
22 23 24 25 26 27 |
# File 'lib/logstash/outputs/oss/file_uploader.rb', line 22 def upload_async(file, = {}) @thread_pool.post do LogStash::Util.set_thread_name("Logstash OSS Output Plugin: output uploader, file: #{file.path}") upload(file, ) end end |