Class: Fluent::S3InputOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::S3InputOutput
- Defined in:
- lib/fluent/plugin/out_s3_input.rb
Instance Attribute Summary collapse
-
#s3 ⇒ Object
Returns the value of attribute s3.
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #emit(tag, es, chain) ⇒ Object
-
#initialize ⇒ S3InputOutput
constructor
A new instance of S3InputOutput.
-
#normalize_json(json) ⇒ Object
Allow JSON data in a couple of formats {} single event [{},{}] array of events {}n{}n{} concatenated events (flume).
Constructor Details
#initialize ⇒ S3InputOutput
Returns a new instance of S3InputOutput.
32 33 34 35 36 37 |
# File 'lib/fluent/plugin/out_s3_input.rb', line 32 def initialize super require 'net/http' require 'oj' require 'aws-sdk' end |
Instance Attribute Details
#s3 ⇒ Object
Returns the value of attribute s3.
30 31 32 |
# File 'lib/fluent/plugin/out_s3_input.rb', line 30 def s3 @s3 end |
Instance Method Details
#configure(conf) ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/fluent/plugin/out_s3_input.rb', line 39 def configure(conf) super if @aws_key_id and @aws_sec_key @s3 = Aws::S3::Client.new( region: @aws_region, access_key_id: @aws_key_id, secret_access_key: @aws_sec_key, ) else @s3 = Aws::S3::Client.new( region: @aws_region, ) end end |
#emit(tag, es, chain) ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/fluent/plugin/out_s3_input.rb', line 67 def emit(tag, es, chain) begin tag_parts = tag.split('.') es.each { |time, record| s3_bucket = record[s3_bucket_key] s3_key = record[s3_object_key_key] s3_key_ext = s3_key.split(".")[-1] resp = s3.get_object(bucket: s3_bucket, key: s3_key) if @gzip_exts.include?(s3_key_ext) input = Zlib::GzipReader.new(resp.body) elsif @zip_exts.include?(s3_key_ext) io = Zip::InputStream.new(resp.body) input = io.get_next_entry #input = Zip::File.open(resp.body).entries.first.get_input_stream else input = resp.body end new_record = {} if @merge_record new_record = {}.merge(record) end s3_record = {} if @format == 'json' json_data=normalize_json input.read begin s3_record = Oj.load(json_data) rescue Oj::ParseError=>e puts "Failure parsing: " puts json_data.to_s puts "Error: #{e.to_s}" end elsif @format == 'csv' data = input.read File.open("/tmp/s3debug", 'w') { |file| file.write(data) } s3_record=CSV.parse(data).to_json else raise "Unsupported format - #{@format}" end # parse the time from the record @time_keys.each do |time_key| puts "Look for #{time_key} in #{new_record}" if s3_record.include? time_key puts "Reset time for #{time_key}" time=Time.strptime(new_record[time_key], @time_format).to_i puts "Setting time to #{time}" break end end if @record_key == nil tmp_record=s3_record.merge(new_record) new_record=tmp_record else new_record[record_key]=s3_record end @remove_keys.each do |key_to_remove| new_record.delete(key_to_remove) end router.emit(@tag, time, new_record) } chain.next rescue StandardError => e $log.warn "s3_input: #{e.class} #{e.message} #{e.backtrace.join(', ')}" end end |
#normalize_json(json) ⇒ Object
Allow JSON data in a couple of formats {} single event
- {},{}
-
array of events
{}n{}n{} concatenated events (flume)
59 60 61 62 63 64 65 |
# File 'lib/fluent/plugin/out_s3_input.rb', line 59 def normalize_json(json) if json[0] != "[" json=json.gsub /}\n{/,"},{" json="[#{json}]" end json end |