Class: Fluent::Plugin::AvroFormatter

Inherits:
Formatter
  • Object
show all
Defined in:
lib/fluent/plugin/formatter_avro.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/fluent/plugin/formatter_avro.rb', line 17

def configure(conf)
  super
  if not ((@schema_json.nil? ? 0 : 1) + (@schema_file.nil? ? 0 : 1) + (@schema_url.nil? ? 0 : 1) == 1) then
    raise Fluent::ConfigError, 'schema_json, schema_file, or schema_url is required, but not multiple!'
  end
  if (@schema_json.nil? && !@schema_file.nil?) then
    @schema_json = File.read(@schema_file)
  end
  if (@schema_json.nil? && !@schema_url.nil?) then
    @schema_json = fetch_schema(@schema_url,@schema_url_key)
  end
  @schema = Avro::Schema.parse(@schema_json)
  @writer = Avro::IO::DatumWriter.new(@schema)
end

#fetch_schema(url, schema_key) ⇒ Object



64
65
66
67
68
69
70
71
# File 'lib/fluent/plugin/formatter_avro.rb', line 64

def fetch_schema(url,schema_key)
  response_body = fetch_url(url)
  if schema_key.nil? then
    return response_body
  else
    return JSON.parse(response_body)[schema_key]
  end
end

#fetch_url(url) ⇒ Object



58
59
60
61
62
# File 'lib/fluent/plugin/formatter_avro.rb', line 58

def fetch_url(url)
  uri = URI.parse(url)
  response = Net::HTTP.get_response uri
  response.body
end

#format(tag, time, record) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/fluent/plugin/formatter_avro.rb', line 32

def format(tag, time, record)
  buffer = StringIO.new
  encoder = Avro::IO::BinaryEncoder.new(buffer)
  begin
    @writer.write(record, encoder)
  rescue => e
    raise e if schema_url.nil?
    schema_changed = false
    begin
      new_schema_json = fetch_schema(@schema_url,@schema_url_key)
      new_schema = Avro::Schema.parse(new_schema_json)
      schema_changed = (new_schema_json == @schema_json)
      @schema_json = new_schema_json
      @schema = new_schema
    rescue
    end
    if schema_changed then
      @writer = Avro::IO::DatumWriter.new(@schema)
      @writer.write(record, encoder)
    else
      raise e
    end
  end
  buffer.string
end