Class: Fluent::OSSOutput
- Inherits:
-
TimeSlicedOutput
- Object
- TimeSlicedOutput
- Fluent::OSSOutput
- Defined in:
- lib/fluent/plugin/out_oss.rb
Instance Method Summary collapse
- #compress(chunk, tmp) ⇒ Object
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #process_object_key_format(chunk, key_format) ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Instance Method Details
#compress(chunk, tmp) ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/fluent/plugin/out_oss.rb', line 29 def compress(chunk, tmp) res = system "gzip -c #{chunk.path} > #{tmp.path}" unless res log.warn "failed to execute gzip command. Fallback to GzipWriter. status = #{$?}" begin tmp.truncate(0) gw = Zlib::GzipWriter.new(tmp) chunk.write_to(gw) gw.close ensure gw.close rescue nil end end end |
#configure(conf) ⇒ Object
25 26 27 |
# File 'lib/fluent/plugin/out_oss.rb', line 25 def configure(conf) super end |
#format(tag, time, record) ⇒ Object
71 72 73 |
# File 'lib/fluent/plugin/out_oss.rb', line 71 def format(tag, time, record) {tag: tag, timestamp: time, log: record}.to_json + "\n" end |
#process_object_key_format(chunk, key_format) ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/fluent/plugin/out_oss.rb', line 44 def process_object_key_format(chunk, key_format) key_map = { host: Socket.gethostname, time_slice: chunk.key, uuid: SecureRandom.hex(4), file_ext: 'gz' } result = key_format key_map.each do |k, v| result = result.gsub("%{#{k.to_s}}", v) end result end |
#start ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/fluent/plugin/out_oss.rb', line 58 def start super Aliyun::Common::Logging.set_log_file('/dev/null') @client = Aliyun::OSS::Client.new( :endpoint => @oss_endpoint, :access_key_id => @oss_key_id, :access_key_secret => @oss_key_secret) raise "Specific bucket not exists: #{@oss_bucket}" unless @client.bucket_exists? @oss_bucket @bucket = @client.get_bucket(@oss_bucket) end |
#write(chunk) ⇒ Object
75 76 77 78 79 80 81 82 83 84 |
# File 'lib/fluent/plugin/out_oss.rb', line 75 def write(chunk) begin f = Tempfile.new('oss-') compress(chunk, f) path = process_object_key_format(chunk, @oss_object_key_format) raise "Upload #{f.path} failed" unless @bucket.resumable_upload(path, f.path) ensure f.close(true) end end |