Class: Fluent::S3InputOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_s3_input.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeS3InputOutput

Returns a new instance of S3InputOutput.



22
23
24
25
26
27
# File 'lib/fluent/plugin/out_s3_input.rb', line 22

def initialize
  super
  require 'net/http'
  require 'oj'
  require 'aws-sdk'
end

Instance Attribute Details

#s3Object

Returns the value of attribute s3.



20
21
22
# File 'lib/fluent/plugin/out_s3_input.rb', line 20

def s3
  @s3
end

Instance Method Details

#configure(conf) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/fluent/plugin/out_s3_input.rb', line 29

def configure(conf)
  super
  if @aws_key_id and @aws_sec_key
    @s3 = Aws::S3::Client.new(
      region: "us-east-1",
      access_key_id: @aws_key_id,
      secret_access_key: @aws_sec_key,
    )
  else
    @s3 = Aws::S3::Client.new(region: "us-east-1")
  end
end

#emit(tag, es, chain) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/fluent/plugin/out_s3_input.rb', line 42

def emit(tag, es, chain)
  tag_parts = tag.split('.')
  es.each { |time, record|
    s3_bucket = record[s3_bucket_key]
    s3_key = record[s3_object_key_key]
    resp = s3.get_object(bucket: s3_bucket, key: s3_key) 
    if @uncompress && @uncompress == "gzip"
      input = Zlib::GzipReader.new(resp.body)
    else
      input = resp.body
    end
    new_record = Oj.load(input.read)
    router.emit(@tag, time, new_record)
  }
  chain.next
rescue => e
  $log.warn "s3_input: #{e.class} #{e.message} #{e.backtrace.join(', ')}"
end