Class: Fluent::Plugin::AvroTurfFormatter
- Inherits:
-
Formatter
- Object
- Formatter
- Fluent::Plugin::AvroTurfFormatter
show all
- Defined in:
- lib/fluent/plugin/formatter_avro_turf_messaging.rb
Defined Under Namespace
Classes: AvroTurfVersionImcompatible
Instance Method Summary
collapse
Instance Method Details
44
45
46
47
48
49
50
51
52
53
54
55
|
# File 'lib/fluent/plugin/formatter_avro_turf_messaging.rb', line 44
def configure(conf)
super
@avro_turf = AvroTurf::Messaging.new(registry_url: @schema_registry_url, schemas_path: @schemas_path)
if @schema
schema_store = @avro_turf.instance_variable_get("@schema_store")
raise AvroTurfVersionImcompatible.new("Cannot access @schema_store") unless schema_store
schemas = schema_store.instance_variable_get("@schemas")
raise AvroTurfVersionImcompatible.new("Cannot access @schemas in @schema_store") unless schemas
Avro::Schema.real_parse(@schema, schemas)
end
end
|
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
|
# File 'lib/fluent/plugin/formatter_avro_turf_messaging.rb', line 57
def format(tag, time, record)
if @schema
schema_name = @schema["name"]
namespace = @schema["namespace"]
end
if @schema_name_key
schema_name ||= @exclude_schema_name_key ? record.delete(@schema_name_key) : record[@schema_name_key]
end
schema_name ||= @default_schema_name
if @namespace_key
namespace ||= @exclude_namespace_key ? record.delete(@namespace_key) : record[@namespace_key]
end
namespace ||= @default_namespace
if @schema_version_key
schema_version = @exclude_schema_version_key ? record.delete(@schema_version_key) : record[@schema_version_key]
end
@avro_turf.encode(record, schema_name: schema_name, namespace: namespace, version: schema_version)
end
|