Class: Fluent::Plugin::AvroTurfFormatter

Inherits:
Formatter
  • Object
show all
Defined in:
lib/fluent/plugin/formatter_avro_turf_messaging.rb

Defined Under Namespace

Classes: AvroTurfVersionImcompatible

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/fluent/plugin/formatter_avro_turf_messaging.rb', line 44

def configure(conf)
  super

  @avro_turf = AvroTurf::Messaging.new(registry_url: @schema_registry_url, schemas_path: @schemas_path)
  if @schema
    schema_store = @avro_turf.instance_variable_get("@schema_store")
    raise AvroTurfVersionImcompatible.new("Cannot access @schema_store") unless schema_store
    schemas = schema_store.instance_variable_get("@schemas")
    raise AvroTurfVersionImcompatible.new("Cannot access @schemas in @schema_store") unless schemas
    Avro::Schema.real_parse(@schema, schemas)
  end
end

#format(tag, time, record) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/fluent/plugin/formatter_avro_turf_messaging.rb', line 57

def format(tag, time, record)
  if @schema
    schema_name = @schema["name"]
    namespace = @schema["namespace"]
  end

  if @schema_name_key
    schema_name ||= @exclude_schema_name_key ? record.delete(@schema_name_key) : record[@schema_name_key]
  end
  schema_name ||= @default_schema_name

  if @namespace_key
    namespace ||= @exclude_namespace_key ? record.delete(@namespace_key) : record[@namespace_key]
  end
  namespace ||= @default_namespace

  if @schema_version_key
    schema_version = @exclude_schema_version_key ? record.delete(@schema_version_key) : record[@schema_version_key]
  end

  @avro_turf.encode(record, schema_name: schema_name, namespace: namespace, version: schema_version)
end