Module: Deimos
- Includes:
- FigTree
- Defined in:
- lib/deimos.rb,
lib/deimos/logging.rb,
lib/deimos/message.rb,
lib/deimos/version.rb,
lib/deimos/consumer.rb,
lib/deimos/producer.rb,
lib/deimos/poll_info.rb,
lib/deimos/exceptions.rb,
lib/deimos/transcoder.rb,
lib/deimos/kafka_source.rb,
lib/deimos/metrics/mock.rb,
lib/deimos/test_helpers.rb,
lib/deimos/tracing/mock.rb,
lib/deimos/backends/base.rb,
lib/deimos/backends/test.rb,
lib/deimos/kafka_message.rb,
lib/deimos/shared_config.rb,
lib/deimos/backends/kafka.rb,
lib/deimos/backends/outbox.rb,
lib/deimos/metrics/datadog.rb,
lib/deimos/tracing/datadog.rb,
lib/deimos/utils/db_poller.rb,
lib/deimos/ext/schema_route.rb,
lib/deimos/kafka_topic_info.rb,
lib/deimos/metrics/provider.rb,
lib/deimos/tracing/provider.rb,
lib/deimos/schema_class/base.rb,
lib/deimos/schema_class/enum.rb,
lib/deimos/ext/consumer_route.rb,
lib/deimos/ext/producer_route.rb,
lib/deimos/utils/schema_class.rb,
lib/deimos/schema_class/record.rb,
lib/deimos/backends/kafka_async.rb,
lib/deimos/config/configuration.rb,
lib/deimos/schema_backends/base.rb,
lib/deimos/schema_backends/mock.rb,
lib/deimos/utils/db_poller/base.rb,
lib/deimos/utils/deadlock_retry.rb,
lib/deimos/schema_backends/plain.rb,
lib/deimos/utils/outbox_producer.rb,
lib/deimos/active_record_consumer.rb,
lib/deimos/active_record_producer.rb,
lib/deimos/ext/producer_middleware.rb,
lib/deimos/metrics/minimal_datadog.rb,
lib/generators/deimos/v2_generator.rb,
lib/deimos/consume/batch_consumption.rb,
lib/deimos/schema_backends/avro_base.rb,
lib/deimos/schema_backends/avro_local.rb,
lib/deimos/schema_backends/proto_base.rb,
lib/deimos/utils/db_poller/time_based.rb,
lib/deimos/consume/message_consumption.rb,
lib/deimos/schema_backends/proto_local.rb,
lib/deimos/utils/db_poller/state_based.rb,
lib/deimos/ext/producer_metrics_listener.rb,
lib/generators/deimos/db_poller_generator.rb,
lib/deimos/schema_backends/avro_validation.rb,
lib/deimos/metrics/minimal_datadog_listener.rb,
lib/generators/deimos/schema_class_generator.rb,
lib/deimos/active_record_consume/batch_record.rb,
lib/deimos/active_record_consume/batch_slicer.rb,
lib/deimos/active_record_consume/mass_updater.rb,
lib/generators/deimos/active_record_generator.rb,
lib/deimos/schema_backends/avro_schema_coercer.rb,
lib/generators/deimos/bulk_import_id_generator.rb,
lib/generators/deimos/outbox_backend_generator.rb,
lib/deimos/schema_backends/avro_schema_registry.rb,
lib/deimos/schema_backends/proto_schema_registry.rb,
lib/deimos/active_record_consume/batch_consumption.rb,
lib/deimos/active_record_consume/batch_record_list.rb,
lib/deimos/active_record_consume/message_consumption.rb,
lib/deimos/active_record_consume/schema_model_converter.rb
Overview
Generates a migration for bulk import ID in consumer.
Defined Under Namespace
Modules: ActiveRecordConsume, Backends, Consume, Generators, KafkaSource, Logging, Metrics, ProducerMiddleware, SchemaBackends, SchemaClass, SharedConfig, TestHelpers, Tracing, Utils
Classes: ActiveRecordConsumer, ActiveRecordProducer, AvroSchemaCoercer, Consumer, ConsumerRoute, KafkaMessage, KafkaTopicInfo, Message, MissingImplementationError, PollInfo, Producer, ProducerMetricsListener, ProducerRoute, Railtie, SchemaField, SchemaRoute, Transcoder
Constant Summary
collapse
- EVENT_TYPES =
%w(
deimos.ar_consumer.consume_batch
deimos.encode_message
deimos.batch_consumption.invalid_records
deimos.batch_consumption.valid_records
deimos.outbox.produce
).freeze
- VERSION =
'2.3.0'
Class Method Summary
collapse
-
.decode(schema:, namespace:, payload:) ⇒ Hash?
-
.decode_message(message) ⇒ Object
-
.disable_producers(*producer_classes, &block) ⇒ void
Run a block without allowing any messages to be produced to Kafka.
-
.encode(schema:, namespace:, payload:, subject: nil) ⇒ String
-
.generate_key_schemas ⇒ Object
-
.karafka_config_for(topic: nil, producer: nil) ⇒ Karafka::Routing::Topic?
-
.karafka_configs ⇒ Array<Karafka::Routing::Topic]
Array<Karafka::Routing::Topic].
-
.load_generated_schema_classes ⇒ void
-
.producer_for(topic) ⇒ Object
-
.producers_disabled?(producer_class = nil) ⇒ Boolean
Are producers disabled? If a class is passed in, check only that class.
-
.schema_backend(schema:, namespace:, backend: Deimos.config.schema.backend) ⇒ Class<Deimos::SchemaBackends::Base>
-
.schema_backend_class(backend: nil) ⇒ Class<Deimos::SchemaBackends::Base>
-
.setup_karafka ⇒ Object
-
.setup_producers ⇒ Object
-
.start_outbox_backend!(thread_count: 1) ⇒ void
Start the DB producers to send Kafka messages.
-
.topic_for_consumer(handler_class) ⇒ String?
-
.waterdrop_producers ⇒ Array<::WaterDrop::Producer>
Class Method Details
.decode(schema:, namespace:, payload:) ⇒ Hash?
102
103
104
|
# File 'lib/deimos.rb', line 102
def decode(schema:, namespace:, payload:)
self.schema_backend(schema: schema, namespace: namespace).decode(payload)
end
|
.decode_message(message) ⇒ Object
107
108
109
110
111
112
113
114
115
116
117
118
119
|
# File 'lib/deimos.rb', line 107
def decode_message(message)
topic = message[:topic]
if Deimos.config.producers.topic_prefix
topic = topic.sub(Deimos.config.producers.topic_prefix, '')
end
config = karafka_config_for(topic: topic)
return message unless config
message[:payload] = config.deserializers[:payload].decode_message_hash(message[:payload])
if message[:key] && config.deserializers[:key].respond_to?(:decode_message_hash)
message[:key] = config.deserializers[:key].decode_message_hash(message[:key])
end
end
|
.disable_producers(*producer_classes, &block) ⇒ void
This method returns an undefined value.
Run a block without allowing any messages to be produced to Kafka. Optionally add a list of producer classes to limit the disabling to those classes.
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
# File 'lib/deimos/producer.rb', line 16
def disable_producers(*producer_classes, &block)
if producer_classes.any?
_disable_producer_classes(producer_classes, &block)
return
end
if Thread.current[:frk_disable_all_producers]
yield
return
end
begin
Thread.current[:frk_disable_all_producers] = true
yield
ensure
Thread.current[:frk_disable_all_producers] = false
end
end
|
.encode(schema:, namespace:, payload:, subject: nil) ⇒ String
93
94
95
96
|
# File 'lib/deimos.rb', line 93
def encode(schema:, namespace:, payload:, subject: nil)
self.schema_backend(schema: schema, namespace: namespace).
encode(payload, topic: subject || "#{namespace}.#{schema}" )
end
|
.generate_key_schemas ⇒ Object
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
# File 'lib/deimos/config/configuration.rb', line 23
def generate_key_schemas
Deimos.karafka_configs.each do |config|
transcoder = config.deserializers[:key]
next unless transcoder.respond_to?(:key_field) &&
transcoder.key_field &&
transcoder.backend.supports_key_schemas?
transcoder.backend = Deimos.schema_backend(schema: config.schema,
namespace: config.namespace,
backend: transcoder.backend_type)
transcoder.backend.generate_key_schema(transcoder.key_field)
end
end
|
.karafka_config_for(topic: nil, producer: nil) ⇒ Karafka::Routing::Topic?
182
183
184
185
186
187
188
|
# File 'lib/deimos.rb', line 182
def karafka_config_for(topic: nil, producer: nil)
if topic
karafka_configs.find { |t| t.name == topic }
elsif producer
karafka_configs.find { |t| t.producer_classes&.include?(producer) }
end
end
|
.karafka_configs ⇒ Array<Karafka::Routing::Topic]
176
177
178
|
# File 'lib/deimos.rb', line 176
def karafka_configs
Karafka::App.routes.flat_map(&:topics).flat_map(&:to_a)
end
|
.load_generated_schema_classes ⇒ void
This method returns an undefined value.
Loads generated classes
40
41
42
43
44
45
46
47
48
49
50
51
|
# File 'lib/deimos/config/configuration.rb', line 40
def load_generated_schema_classes
if Deimos.config.schema.generated_class_path.nil?
raise 'Cannot use schema classes without schema.generated_class_path. ' \
'Please provide a directory.'
end
Dir["./#{Deimos.config.schema.generated_class_path}/**/*.rb"].sort.
each { |f| require f }
rescue LoadError
raise 'Cannot load schema classes. Please regenerate classes with' \
'rake deimos:generate_schema_models.'
end
|
.producer_for(topic) ⇒ Object
196
197
198
|
# File 'lib/deimos.rb', line 196
def producer_for(topic)
@producers[topic] || Karafka.producer
end
|
.producers_disabled?(producer_class = nil) ⇒ Boolean
Are producers disabled? If a class is passed in, check only that class. Otherwise check if the global disable flag is set.
52
53
54
55
56
57
|
# File 'lib/deimos/producer.rb', line 52
def producers_disabled?(producer_class=nil)
return true if Deimos.config.producers.disabled
Thread.current[:frk_disable_all_producers] ||
Thread.current[:frk_disabled_producers]&.include?(producer_class)
end
|
.schema_backend(schema:, namespace:, backend: Deimos.config.schema.backend) ⇒ Class<Deimos::SchemaBackends::Base>
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
# File 'lib/deimos.rb', line 70
def schema_backend(schema:, namespace:, backend: Deimos.config.schema.backend)
if config.schema.use_schema_classes
schema_class = Utils::SchemaClass.klass(schema, namespace)
if schema_class.nil?
schema_backend_class(backend: backend).new(schema: schema, namespace: namespace)
else
schema_instance = schema_class.allocate
schema_backend_class(backend: backend).
new(schema: schema_instance.schema, namespace: schema_instance.namespace)
end
else
schema_backend_class(backend: backend).new(schema: schema, namespace: namespace)
end
end
|
59
60
61
62
63
64
65
|
# File 'lib/deimos.rb', line 59
def schema_backend_class(backend: nil)
backend ||= Deimos.config.schema.backend
require "deimos/schema_backends/#{backend}"
"Deimos::SchemaBackends::#{backend.to_s.classify}".constantize
end
|
.setup_karafka ⇒ Object
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
|
# File 'lib/deimos.rb', line 158
def setup_karafka
setup_producers
waterdrop_producers.each do |producer|
producer.middleware.append(Deimos::ProducerMiddleware)
producer.monitor.subscribe(ProducerMetricsListener.new)
producer.monitor.subscribe('error.occurred') do |event|
if event.payload.key?(:messages)
topic = event[:messages].first[:topic]
config = Deimos.karafka_config_for(topic: topic)
message = Deimos::Logging.messages_log_text(config&.payload_log, event[:messages])
Karafka.logger.error("Error producing messages: #{event[:error].message} #{message.to_json}")
end
end
end
EVENT_TYPES.each { |type| Karafka.monitor.notifications_bus.register_event(type) }
end
|
.setup_producers ⇒ Object
145
146
147
148
149
150
151
152
153
154
155
156
|
# File 'lib/deimos.rb', line 145
def setup_producers
@producers = {}
producers_by_broker = {}
Deimos.karafka_configs.each do |topic|
broker = topic.kafka[:'bootstrap.servers']
producers_by_broker[broker] ||= ::WaterDrop::Producer.new do |p_config|
config_hash = Karafka::Setup::Config.config.kafka.merge(topic.kafka)
p_config.kafka = Karafka::Setup::AttributesMap.producer(config_hash)
end
@producers[topic.name] = producers_by_broker[broker]
end
end
|
.start_outbox_backend!(thread_count: 1) ⇒ void
This method returns an undefined value.
Start the DB producers to send Kafka messages.
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
|
# File 'lib/deimos.rb', line 124
def start_outbox_backend!(thread_count: 1)
Sigurd.exit_on_signal = true
if self.config.producers.backend != :outbox
raise('Publish backend is not set to :outbox, exiting')
end
if thread_count.nil? || thread_count.zero?
raise('Thread count is not given or set to zero, exiting')
end
producers = (1..thread_count).map do
Deimos::Utils::OutboxProducer.
new(self.config.outbox.logger || Karafka.logger)
end
executor = Sigurd::Executor.new(producers,
sleep_seconds: 5,
logger: Karafka.logger)
signal_handler = Sigurd::SignalHandler.new(executor)
signal_handler.run!
end
|
.topic_for_consumer(handler_class) ⇒ String?
201
202
203
204
205
206
207
208
|
# File 'lib/deimos.rb', line 201
def topic_for_consumer(handler_class)
Deimos.karafka_configs.each do |topic|
if topic.consumer == handler_class
return topic.name
end
end
nil
end
|
.waterdrop_producers ⇒ Array<::WaterDrop::Producer>
191
192
193
|
# File 'lib/deimos.rb', line 191
def waterdrop_producers
(@producers.values + [Karafka.producer]).uniq
end
|