Class: Fluent::Plugin::AvroFormatter
- Inherits:
-
Formatter
- Object
- Formatter
- Fluent::Plugin::AvroFormatter
- Defined in:
- lib/fluent/plugin/formatter_avro.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #fetch_schema(url, schema_key) ⇒ Object
- #fetch_url(url) ⇒ Object
- #format(tag, time, record) ⇒ Object
Instance Method Details
#configure(conf) ⇒ Object
17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/fluent/plugin/formatter_avro.rb', line 17 def configure(conf) super if not ((@schema_json.nil? ? 0 : 1) + (@schema_file.nil? ? 0 : 1) + (@schema_url.nil? ? 0 : 1) == 1) then raise Fluent::ConfigError, 'schema_json, schema_file, or schema_url is required, but not multiple!' end if (@schema_json.nil? && !@schema_file.nil?) then @schema_json = File.read(@schema_file) end if (@schema_json.nil? && !@schema_url.nil?) then @schema_json = fetch_schema(@schema_url,@schema_url_key) end @schema = Avro::Schema.parse(@schema_json) @writer = Avro::IO::DatumWriter.new(@schema) end |
#fetch_schema(url, schema_key) ⇒ Object
64 65 66 67 68 69 70 71 |
# File 'lib/fluent/plugin/formatter_avro.rb', line 64 def fetch_schema(url,schema_key) response_body = fetch_url(url) if schema_key.nil? then return response_body else return JSON.parse(response_body)[schema_key] end end |
#fetch_url(url) ⇒ Object
58 59 60 61 62 |
# File 'lib/fluent/plugin/formatter_avro.rb', line 58 def fetch_url(url) uri = URI.parse(url) response = Net::HTTP.get_response uri response.body end |
#format(tag, time, record) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/fluent/plugin/formatter_avro.rb', line 32 def format(tag, time, record) buffer = StringIO.new encoder = Avro::IO::BinaryEncoder.new(buffer) begin @writer.write(record, encoder) rescue => e raise e if schema_url.nil? schema_changed = false begin new_schema_json = fetch_schema(@schema_url,@schema_url_key) new_schema = Avro::Schema.parse(new_schema_json) schema_changed = (new_schema_json == @schema_json) @schema_json = new_schema_json @schema = new_schema rescue end if schema_changed then @writer = Avro::IO::DatumWriter.new(@schema) @writer.write(record, encoder) else raise e end end buffer.string end |