Class: Fluent::S3InputOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_s3_input.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeS3InputOutput

Returns a new instance of S3InputOutput.



32
33
34
35
36
37
# File 'lib/fluent/plugin/out_s3_input.rb', line 32

def initialize
  super
  require 'net/http'
  require 'oj'
  require 'aws-sdk'
end

Instance Attribute Details

#s3Object

Returns the value of attribute s3.



30
31
32
# File 'lib/fluent/plugin/out_s3_input.rb', line 30

def s3
  @s3
end

Instance Method Details

#configure(conf) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/fluent/plugin/out_s3_input.rb', line 39

def configure(conf)
  super

  if @aws_key_id and @aws_sec_key
    @s3 = Aws::S3::Client.new(
      region: @aws_region,
      access_key_id: @aws_key_id,
      secret_access_key: @aws_sec_key,
    )
  else
    @s3 = Aws::S3::Client.new(
      region: @aws_region,
    )
  end
end

#emit(tag, es, chain) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/fluent/plugin/out_s3_input.rb', line 67

def emit(tag, es, chain)
  begin
    tag_parts = tag.split('.')
    es.each { |time, record|
      s3_bucket = record[s3_bucket_key]
      s3_key = record[s3_object_key_key]
      s3_key_ext = s3_key.split(".")[-1]
      resp = s3.get_object(bucket: s3_bucket, key: s3_key)

      if @gzip_exts.include?(s3_key_ext)
        input = Zlib::GzipReader.new(resp.body)
      elsif @zip_exts.include?(s3_key_ext)
        io = Zip::InputStream.new(resp.body)
        input = io.get_next_entry
      else
        input = resp.body
      end

      default_record = {}
      if @merge_record
        default_record = {}.merge(record)
      end

      s3_record = {}
      if @format == 'json'
        json_data=normalize_json input.read
        begin
          s3_record = Oj.load(json_data)
        rescue Oj::ParseError=>e
          $log.error e.to_s
          $log.error json_data
        end
      elsif @format == 'csv'
        data = input.read
        File.open("/tmp/s3debug", 'w') { |file| file.write(data) }
        s3_record=CSV.parse(data).to_json
      else
        raise "Unsupported format - #{@format}"
      end


      s3_record.each do |a_record|

        # parse the time from the record
        @time_keys.each do |time_key|
          if s3_record.include? time_key
            time=Time.strptime(a_record[time_key], @time_format).to_f
            $log.debug "Reset time for #{time_key}, Setting time to #{time}"
            break
          end
        end

        if @record_key == nil
          tmp_record=a_record.merge(default_record)
          new_record=tmp_record
        else
          new_record[record_key]=a_record
        end

        @remove_keys.each do |key_to_remove|
          new_record.delete(key_to_remove)
        end
        $log.debug "Emit - #{new_record}"
        router.emit(@tag, time, new_record)
      end
    }
    chain.next
  rescue StandardError => e
    $log.warn "s3_input: #{e.class} #{e.message} #{e.backtrace.join(', ')}"
  end
end

#normalize_json(json) ⇒ Object

Allow JSON data in a couple of formats {} single event

{},{}

array of events

{}n{}n{} concatenated events (flume)



59
60
61
62
63
64
65
# File 'lib/fluent/plugin/out_s3_input.rb', line 59

def normalize_json(json)
  if json[0] != "["
    json=json.gsub /}\n{/,"},{"
    json="[#{json}]"
  end
  json
end