Class: Fluent::S3InputOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_s3_input.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeS3InputOutput

Returns a new instance of S3InputOutput.



32
33
34
35
36
37
# File 'lib/fluent/plugin/out_s3_input.rb', line 32

def initialize
  super
  require 'net/http'
  require 'oj'
  require 'aws-sdk'
end

Instance Attribute Details

#s3Object

Returns the value of attribute s3.



30
31
32
# File 'lib/fluent/plugin/out_s3_input.rb', line 30

def s3
  @s3
end

Instance Method Details

#configure(conf) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/fluent/plugin/out_s3_input.rb', line 39

def configure(conf)
  super

  if @aws_key_id and @aws_sec_key
    @s3 = Aws::S3::Client.new(
      region: @aws_region,
      access_key_id: @aws_key_id,
      secret_access_key: @aws_sec_key,
    )
  else
    @s3 = Aws::S3::Client.new(
      region: @aws_region,
    )
  end
end

#emit(tag, es, chain) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/fluent/plugin/out_s3_input.rb', line 67

def emit(tag, es, chain)
  begin
    tag_parts = tag.split('.')
    es.each { |time, record|
      s3_bucket = record[s3_bucket_key]
      s3_key = record[s3_object_key_key]
      s3_key_ext = s3_key.split(".")[-1]
      resp = s3.get_object(bucket: s3_bucket, key: s3_key) 

      if @gzip_exts.include?(s3_key_ext)
        input = Zlib::GzipReader.new(resp.body)
      elsif @zip_exts.include?(s3_key_ext)
        io = Zip::InputStream.new(resp.body)
        input = io.get_next_entry
        #input = Zip::File.open(resp.body).entries.first.get_input_stream
      else
        input = resp.body
      end

      new_record = {}
      if @merge_record
        new_record = {}.merge(record)
      end
   
      s3_record = {} 
      if @format == 'json'
        json_data=normalize_json input.read
        begin
          s3_record = Oj.load(json_data)
        rescue Oj::ParseError=>e
          puts "Failure parsing: "
          puts json_data.to_s
          puts "Error: #{e.to_s}"
        end
      elsif @format == 'csv'
        data = input.read
        File.open("/tmp/s3debug", 'w') { |file| file.write(data) } 
        s3_record=CSV.parse(data).to_json
      else
        raise "Unsupported format - #{@format}"
      end

      # parse the time from the record
      @time_keys.each do |time_key|
        puts "Look for #{time_key} in #{new_record}"
        if s3_record.include? time_key
          puts "Reset time for #{time_key}"
          time=Time.strptime(new_record[time_key], @time_format).to_i
          puts "Setting time to #{time}"
          break
        end
      end

      if @record_key == nil  
        tmp_record=s3_record.merge(new_record)
        new_record=tmp_record
      else
        new_record[record_key]=s3_record
      end
    
      @remove_keys.each do |key_to_remove|
        new_record.delete(key_to_remove)
      end

      router.emit(@tag, time, new_record)
    }
    chain.next
  rescue StandardError => e
    $log.warn "s3_input: #{e.class} #{e.message} #{e.backtrace.join(', ')}"
  end
end

#normalize_json(json) ⇒ Object

Allow JSON data in a couple of formats {} single event

{},{}

array of events

{}n{}n{} concatenated events (flume)



59
60
61
62
63
64
65
# File 'lib/fluent/plugin/out_s3_input.rb', line 59

def normalize_json(json)
  if json[0] != "["
    json=json.gsub /}\n{/,"},{" 
    json="[#{json}]"
  end
  json
end