Class: Fluent::Plugin::AvroParser

Inherits:
Parser
  • Object
show all
Defined in:
lib/fluent/plugin/parser_avro.rb

Constant Summary collapse

MAGIC_BYTE =
[0].pack("C").freeze

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/fluent/plugin/parser_avro.rb', line 48

def configure(conf)
  super

  if (!@writers_schema_file.nil? || !@writers_schema_json.nil?) &&
     (!@readers_schema_file.nil? || !@readers_schema_json.nil?)
    unless [@writers_schema_json, @writers_schema_file].compact.size == 1
      raise Fluent::ConfigError, "writers_schema_json, writers_schema_file is required, but they cannot specify at the same time!"
    end
    unless [@readers_schema_json, @readers_schema_file].compact.size == 1
      raise Fluent::ConfigError, "readers_schema_json, readers_schema_file is required, but they cannot specify at the same time!"
    end

    @writers_raw_schema = if @writers_schema_file
                            File.read(@writers_schema_file)
                          elsif @writers_schema_json
                            @writers_schema_json
                          end
    @readers_raw_schema = if @readers_schema_file
                            File.read(@readers_schema_file)
                          elsif @readers_schema_json
                            @readers_schema_json
                          end

    @writers_schema = Avro::Schema.parse(@writers_raw_schema)
    @readers_schema = Avro::Schema.parse(@readers_raw_schema)
    @reader = Avro::IO::DatumReader.new(@writers_schema, @readers_schema)
  elsif @avro_registry
    @confluent_registry = Fluent::Plugin::ConfluentAvroSchemaRegistry.new(@avro_registry.url, @api_key, @api_secret)
    @raw_schema = @confluent_registry.subject_version(@avro_registry.subject,
                                                      @avro_registry.schema_key,
                                                      @avro_registry.schema_version)
    @schema = Avro::Schema.parse(@raw_schema)
    @reader = Avro::IO::DatumReader.new(@schema)
  else
    unless [@schema_json, @schema_file, @schema_url].compact.size == 1
      raise Fluent::ConfigError, "schema_json, schema_file, or schema_url is required, but they cannot specify at the same time!"
    end

    @raw_schema = if @schema_file
                    File.read(@schema_file)
                  elsif @schema_url
                    fetch_schema(@schema_url, @schema_url_key)
                  elsif @schema_json
                    @schema_json
                  end

    @schema = Avro::Schema.parse(@raw_schema)
    @reader = Avro::IO::DatumReader.new(@schema)
  end
end

#fetch_schema(url, schema_key) ⇒ Object



165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/fluent/plugin/parser_avro.rb', line 165

def fetch_schema(url, schema_key)
  uri = URI.parse(url)
  response = Net::HTTP.start(uri.host, uri.port, :use_ssl => (uri.scheme == "https")) do |http|
    request = Net::HTTP::Get.new(uri.path)
    if @api_key and @api_secret
      request.basic_auth(@api_key, @api_secret)
    end
    http.request(request)
  end
  if schema_key.nil?
    response.body
  else
    Yajl.load(response.body)[schema_key]
  end
end

#parse(data) ⇒ Object



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/fluent/plugin/parser_avro.rb', line 103

def parse(data)
  buffer = StringIO.new(data)
  decoder = Avro::IO::BinaryDecoder.new(buffer)
  begin
    if @use_confluent_schema || @avro_registry
      # When using confluent avro schema, record is formatted as follows:
      #
      # MAGIC_BYTE | schema_id | record
      # ----------:|:---------:|:---------------
      #  1byte     |  4bytes   | record contents
      magic_byte = decoder.read(1)

      if magic_byte != MAGIC_BYTE
        raise "The first byte should be magic byte but got {magic_byte.inspect}"
      end
      schema_id = decoder.read(4).unpack("N").first
    end
    decoded_data = @reader.read(decoder)
    time, record = convert_values(parse_time(decoded_data), decoded_data)
    yield time, record
  rescue EOFError, RuntimeError => e
    raise e unless [@schema_url, @avro_registry].compact.size == 1
    begin
      new_raw_schema = if @schema_url
                         fetch_schema(@schema_url, @schema_url_key)
                       elsif @avro_registry
                         @confluent_registry.schema_with_id(schema_id,
                                                            @avro_registry.schema_key)
                       end
      new_schema = Avro::Schema.parse(new_raw_schema)
      is_changed = (new_raw_schema != @raw_schema)
      @raw_schema = new_raw_schema
      @schema = new_schema
    rescue EOFError, RuntimeError
      # Do nothing.
    end
    if is_changed
      buffer = StringIO.new(data)
      decoder = Avro::IO::BinaryDecoder.new(buffer)
      if @use_confluent_schema || @avro_registry
        # When using confluent avro schema, record is formatted as follows:
        #
        # MAGIC_BYTE | schema_id | record
        # ----------:|:---------:|:---------------
        #  1byte     |  4bytes   | record contents
        magic_byte = decoder.read(1)

        if magic_byte != MAGIC_BYTE
          raise "The first byte should be magic byte but got {magic_byte.inspect}"
        end
        schema_id = decoder.read(4).unpack("N").first
      end
      @reader = Avro::IO::DatumReader.new(@schema)
      decoded_data = @reader.read(decoder)
      time, record = convert_values(parse_time(decoded_data), decoded_data)
      yield time, record
    else
      raise e
    end
  end
end

#parser_typeObject



99
100
101
# File 'lib/fluent/plugin/parser_avro.rb', line 99

def parser_type
  :binary
end