Class: LogStash::Outputs::Qingstor::Uploader

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/outputs/qingstor/uploader.rb,
lib/logstash/outputs/qingstor/multipart_uploader.rb

Defined Under Namespace

Classes: MultipartUploader

Constant Summary collapse

TIME_BEFORE_RETRYING_SECONDS =
1
DEFAULT_THREADPOOL =
Concurrent::ThreadPoolExecutor.new(
  :min_thread => 1,
  :max_thread => 8,
  :max_queue => 1,
  :fallback_policy => :caller_runs
)

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(bucket, logger, threadpool = DEFAULT_THREADPOOL) ⇒ Uploader

Returns a new instance of Uploader.



25
26
27
28
29
# File 'lib/logstash/outputs/qingstor/uploader.rb', line 25

def initialize(bucket, logger, threadpool = DEFAULT_THREADPOOL)
  @bucket = bucket
  @logger = logger
  @workers_pool = threadpool
end

Instance Attribute Details

#bucketObject (readonly)

Returns the value of attribute bucket.



23
24
25
# File 'lib/logstash/outputs/qingstor/uploader.rb', line 23

def bucket
  @bucket
end

#loggerObject (readonly)

Returns the value of attribute logger.



23
24
25
# File 'lib/logstash/outputs/qingstor/uploader.rb', line 23

def logger
  @logger
end

#upload_optionsObject (readonly)

Returns the value of attribute upload_options.



23
24
25
# File 'lib/logstash/outputs/qingstor/uploader.rb', line 23

def upload_options
  @upload_options
end

Instance Method Details

#process_encrypt_options(upload_options) ⇒ Object



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/logstash/outputs/qingstor/uploader.rb', line 55

def process_encrypt_options(upload_options)
  res = {}

  unless upload_options[:server_side_encryption_algorithm].nil?
    base64_key = Base64.strict_encode64(upload_options[:customer_key])
    key_md5 = Digest::MD5.hexdigest(upload_options[:customer_key])
    base64_key_md5 = Base64.strict_encode64(key_md5)
    res.merge!(
      'x_qs_encryption_customer_algorithm' =>
        upload_options[:server_side_encryption_algorithm],
      'x_qs_encryption_customer_key' => base64_key,
      'x_qs_encryption_customer_key_md5' => base64_key_md5
    )
  end

  res
end

#stopObject



73
74
75
76
# File 'lib/logstash/outputs/qingstor/uploader.rb', line 73

def stop
  @workers_pool.shutdown
  @workers_pool.wait_for_termination(nil)
end

#upload(file, options = {}) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/logstash/outputs/qingstor/uploader.rb', line 37

def upload(file, options = {})
  upload_options = options.fetch(:upload_options, {})
  upload_headers = process_encrypt_options(upload_options)

  if file.size > 50 * 1024 * 1024
    @logger.info('Multipart uploading file', :file => file.key)
    multipart_uploader = MultipartUploader.new(@bucket, @logger, file, upload_headers)
    multipart_uploader.upload
  else
    upload_headers['content_md5'] = Digest::MD5.file(file.path).to_s
    upload_headers['body'] = ::File.read(file.path)
    @logger.info('Uploading file', :file => file.key)
    @bucket.put_object(file.key, upload_headers)
  end

  options[:on_complete].call(file) unless options[:on_complete].nil?
end

#upload_async(file, options = {}) ⇒ Object



31
32
33
34
35
# File 'lib/logstash/outputs/qingstor/uploader.rb', line 31

def upload_async(file, options = {})
  @workers_pool.post do
    upload(file, options)
  end
end