Class: Fluent::OSSOutput

Inherits:
TimeSlicedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_oss.rb

Instance Method Summary collapse

Instance Method Details

#compress(chunk, tmp) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/fluent/plugin/out_oss.rb', line 29

def compress(chunk, tmp)
  res = system "gzip -c #{chunk.path} > #{tmp.path}"
  unless res
    log.warn "failed to execute gzip command. Fallback to GzipWriter. status = #{$?}"
    begin
      tmp.truncate(0)
      gw = Zlib::GzipWriter.new(tmp)
      chunk.write_to(gw)
      gw.close
    ensure
      gw.close rescue nil
    end
  end
end

#configure(conf) ⇒ Object



25
26
27
# File 'lib/fluent/plugin/out_oss.rb', line 25

def configure(conf)
  super
end

#format(tag, time, record) ⇒ Object



71
72
73
# File 'lib/fluent/plugin/out_oss.rb', line 71

def format(tag, time, record)
  {tag: tag, timestamp: time, log: record}.to_json + "\n"
end

#process_object_key_format(chunk, key_format) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/fluent/plugin/out_oss.rb', line 44

def process_object_key_format(chunk, key_format)
  key_map = {
    host: Socket.gethostname,
    time_slice: chunk.key,
    uuid: SecureRandom.hex(4),
    file_ext: 'gz'
  }
  result = key_format
  key_map.each do |k, v|
    result = result.gsub("%{#{k.to_s}}", v)
  end
  result
end

#startObject



58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/fluent/plugin/out_oss.rb', line 58

def start
  super
  Aliyun::Common::Logging.set_log_file('/dev/null')
  @client = Aliyun::OSS::Client.new(
    :endpoint => @oss_endpoint,
    :access_key_id => @oss_key_id,
    :access_key_secret => @oss_key_secret)

  raise "Specific bucket not exists: #{@oss_bucket}" unless @client.bucket_exists? @oss_bucket

  @bucket = @client.get_bucket(@oss_bucket)
end

#write(chunk) ⇒ Object



75
76
77
78
79
80
81
82
83
84
# File 'lib/fluent/plugin/out_oss.rb', line 75

def write(chunk)
  begin
    f = Tempfile.new('oss-')
    compress(chunk, f)
    path = process_object_key_format(chunk, @oss_object_key_format)
    raise "Upload #{f.path} failed" unless @bucket.resumable_upload(path, f.path)
  ensure
    f.close(true)
  end
end