Class: LogStash::Outputs::Qingstor::Uploader
- Inherits:
-
Object
- Object
- LogStash::Outputs::Qingstor::Uploader
- Defined in:
- lib/logstash/outputs/qingstor/uploader.rb,
lib/logstash/outputs/qingstor/multipart_uploader.rb
Defined Under Namespace
Classes: MultipartUploader
Constant Summary collapse
- TIME_BEFORE_RETRYING_SECONDS =
1- DEFAULT_THREADPOOL =
Concurrent::ThreadPoolExecutor.new( :min_thread => 1, :max_thread => 8, :max_queue => 1, :fallback_policy => :caller_runs )
Instance Attribute Summary collapse
-
#bucket ⇒ Object
readonly
Returns the value of attribute bucket.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#upload_options ⇒ Object
readonly
Returns the value of attribute upload_options.
Instance Method Summary collapse
-
#initialize(bucket, logger, threadpool = DEFAULT_THREADPOOL) ⇒ Uploader
constructor
A new instance of Uploader.
- #process_encrypt_options(upload_options) ⇒ Object
- #stop ⇒ Object
- #upload(file, options = {}) ⇒ Object
- #upload_async(file, options = {}) ⇒ Object
Constructor Details
#initialize(bucket, logger, threadpool = DEFAULT_THREADPOOL) ⇒ Uploader
Returns a new instance of Uploader.
25 26 27 28 29 |
# File 'lib/logstash/outputs/qingstor/uploader.rb', line 25 def initialize(bucket, logger, threadpool = DEFAULT_THREADPOOL) @bucket = bucket @logger = logger @workers_pool = threadpool end |
Instance Attribute Details
#bucket ⇒ Object (readonly)
Returns the value of attribute bucket.
23 24 25 |
# File 'lib/logstash/outputs/qingstor/uploader.rb', line 23 def bucket @bucket end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
23 24 25 |
# File 'lib/logstash/outputs/qingstor/uploader.rb', line 23 def logger @logger end |
#upload_options ⇒ Object (readonly)
Returns the value of attribute upload_options.
23 24 25 |
# File 'lib/logstash/outputs/qingstor/uploader.rb', line 23 def end |
Instance Method Details
#process_encrypt_options(upload_options) ⇒ Object
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/logstash/outputs/qingstor/uploader.rb', line 55 def () res = {} unless [:server_side_encryption_algorithm].nil? base64_key = Base64.strict_encode64([:customer_key]) key_md5 = Digest::MD5.hexdigest([:customer_key]) base64_key_md5 = Base64.strict_encode64(key_md5) res.merge!( 'x_qs_encryption_customer_algorithm' => [:server_side_encryption_algorithm], 'x_qs_encryption_customer_key' => base64_key, 'x_qs_encryption_customer_key_md5' => base64_key_md5 ) end res end |
#stop ⇒ Object
73 74 75 76 |
# File 'lib/logstash/outputs/qingstor/uploader.rb', line 73 def stop @workers_pool.shutdown @workers_pool.wait_for_termination(nil) end |
#upload(file, options = {}) ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/logstash/outputs/qingstor/uploader.rb', line 37 def upload(file, = {}) = .fetch(:upload_options, {}) upload_headers = () if file.size > 50 * 1024 * 1024 @logger.info('Multipart uploading file', :file => file.key) multipart_uploader = MultipartUploader.new(@bucket, @logger, file, upload_headers) multipart_uploader.upload else upload_headers['content_md5'] = Digest::MD5.file(file.path).to_s upload_headers['body'] = ::File.read(file.path) @logger.info('Uploading file', :file => file.key) @bucket.put_object(file.key, upload_headers) end [:on_complete].call(file) unless [:on_complete].nil? end |
#upload_async(file, options = {}) ⇒ Object
31 32 33 34 35 |
# File 'lib/logstash/outputs/qingstor/uploader.rb', line 31 def upload_async(file, = {}) @workers_pool.post do upload(file, ) end end |