Module: Deimos
- Includes:
- FigTree
- Defined in:
- lib/deimos.rb,
lib/deimos/logging.rb,
lib/deimos/message.rb,
lib/deimos/version.rb,
lib/deimos/consumer.rb,
lib/deimos/producer.rb,
lib/deimos/poll_info.rb,
lib/deimos/exceptions.rb,
lib/deimos/transcoder.rb,
lib/deimos/kafka_source.rb,
lib/deimos/metrics/mock.rb,
lib/deimos/test_helpers.rb,
lib/deimos/tracing/mock.rb,
lib/deimos/backends/base.rb,
lib/deimos/backends/test.rb,
lib/deimos/kafka_message.rb,
lib/deimos/shared_config.rb,
lib/deimos/backends/kafka.rb,
lib/deimos/backends/outbox.rb,
lib/deimos/metrics/datadog.rb,
lib/deimos/tracing/datadog.rb,
lib/deimos/utils/db_poller.rb,
lib/deimos/ext/schema_route.rb,
lib/deimos/kafka_topic_info.rb,
lib/deimos/metrics/provider.rb,
lib/deimos/tracing/provider.rb,
lib/deimos/schema_class/base.rb,
lib/deimos/schema_class/enum.rb,
lib/deimos/ext/consumer_route.rb,
lib/deimos/ext/producer_route.rb,
lib/deimos/utils/schema_class.rb,
lib/deimos/schema_class/record.rb,
lib/deimos/backends/kafka_async.rb,
lib/deimos/config/configuration.rb,
lib/deimos/schema_backends/base.rb,
lib/deimos/schema_backends/mock.rb,
lib/deimos/utils/db_poller/base.rb,
lib/deimos/utils/deadlock_retry.rb,
lib/deimos/schema_backends/plain.rb,
lib/deimos/utils/outbox_producer.rb,
lib/deimos/active_record_consumer.rb,
lib/deimos/active_record_producer.rb,
lib/deimos/ext/producer_middleware.rb,
lib/deimos/metrics/minimal_datadog.rb,
lib/generators/deimos/v2_generator.rb,
lib/deimos/consume/batch_consumption.rb,
lib/deimos/schema_backends/avro_base.rb,
lib/deimos/schema_backends/avro_local.rb,
lib/deimos/schema_backends/proto_base.rb,
lib/deimos/utils/db_poller/time_based.rb,
lib/deimos/consume/message_consumption.rb,
lib/deimos/schema_backends/proto_local.rb,
lib/deimos/utils/db_poller/state_based.rb,
lib/deimos/ext/producer_metrics_listener.rb,
lib/generators/deimos/db_poller_generator.rb,
lib/deimos/schema_backends/avro_validation.rb,
lib/deimos/metrics/minimal_datadog_listener.rb,
lib/generators/deimos/schema_class_generator.rb,
lib/deimos/active_record_consume/batch_record.rb,
lib/deimos/active_record_consume/batch_slicer.rb,
lib/deimos/active_record_consume/mass_updater.rb,
lib/generators/deimos/active_record_generator.rb,
lib/deimos/schema_backends/avro_schema_coercer.rb,
lib/generators/deimos/bulk_import_id_generator.rb,
lib/generators/deimos/outbox_backend_generator.rb,
lib/deimos/schema_backends/avro_schema_registry.rb,
lib/deimos/schema_backends/proto_schema_registry.rb,
lib/deimos/active_record_consume/batch_consumption.rb,
lib/deimos/active_record_consume/batch_record_list.rb,
lib/deimos/active_record_consume/message_consumption.rb,
lib/deimos/active_record_consume/schema_model_converter.rb
Overview
Generates a migration for bulk import ID in consumer.
Defined Under Namespace
Modules: ActiveRecordConsume, Backends, Consume, Generators, KafkaSource, Logging, Metrics, ProducerMiddleware, SchemaBackends, SchemaClass, SharedConfig, TestHelpers, Tracing, Utils
Classes: ActiveRecordConsumer, ActiveRecordProducer, AvroSchemaCoercer, Consumer, ConsumerRoute, KafkaMessage, KafkaTopicInfo, Message, MissingImplementationError, PollInfo, Producer, ProducerMetricsListener, ProducerRoute, Railtie, RegistryInfo, SchemaField, SchemaRoute, Transcoder
Constant Summary
collapse
- EVENT_TYPES =
%w(
deimos.ar_consumer.consume_batch
deimos.encode_message
deimos.batch_consumption.invalid_records
deimos.batch_consumption.valid_records
deimos.outbox.produce
).freeze
- VERSION =
'2.4.0'
Class Attribute Summary collapse
Class Method Summary
collapse
-
.decode(schema:, namespace:, payload:) ⇒ Hash?
-
.decode_message(message) ⇒ Object
-
.disable_producers(*producer_classes, &block) ⇒ void
Run a block without allowing any messages to be produced to Kafka.
-
.encode(schema:, namespace:, payload:, subject: nil) ⇒ String
-
.generate_key_schemas ⇒ Object
-
.karafka_config_for(topic: nil, producer: nil) ⇒ Karafka::Routing::Topic?
-
.karafka_configs ⇒ Array<Karafka::Routing::Topic]
Array<Karafka::Routing::Topic].
-
.load_generated_schema_classes ⇒ void
-
.producer_for(topic) ⇒ Object
-
.producers_disabled?(producer_class = nil) ⇒ Boolean
Are producers disabled? If a class is passed in, check only that class.
-
.schema_backend(schema:, namespace:, registry_info: nil, backend: Deimos.config.schema.backend) ⇒ Deimos::SchemaBackends::Base
-
.schema_backend_class(backend: nil) ⇒ Class<Deimos::SchemaBackends::Base>
-
.schema_backend_for(topic_name) ⇒ SchemaBackends::Base
-
.setup_karafka ⇒ Object
-
.setup_producers ⇒ Object
-
.start_outbox_backend!(thread_count: 1) ⇒ void
Start the DB producers to send Kafka messages.
-
.topic_for_consumer(handler_class) ⇒ String?
-
.waterdrop_producers ⇒ Array<::WaterDrop::Producer>
Class Attribute Details
.mock_backends ⇒ Boolean
Returns for use in unit tests.
58
59
60
|
# File 'lib/deimos.rb', line 58
def mock_backends
@mock_backends
end
|
Class Method Details
.decode(schema:, namespace:, payload:) ⇒ Hash?
140
141
142
|
# File 'lib/deimos.rb', line 140
def decode(schema:, namespace:, payload:)
self.schema_backend(schema: schema, namespace: namespace).decode(payload)
end
|
.decode_message(message) ⇒ Object
145
146
147
148
149
150
151
152
153
154
155
156
157
|
# File 'lib/deimos.rb', line 145
def decode_message(message)
topic = message[:topic]
if Deimos.config.producers.topic_prefix
topic = topic.sub(Deimos.config.producers.topic_prefix, '')
end
config = karafka_config_for(topic: topic)
return message unless config
message[:payload] = config.deserializers[:payload].decode_message_hash(message[:payload])
if message[:key] && config.deserializers[:key].respond_to?(:decode_message_hash)
message[:key] = config.deserializers[:key].decode_message_hash(message[:key])
end
end
|
.disable_producers(*producer_classes, &block) ⇒ void
This method returns an undefined value.
Run a block without allowing any messages to be produced to Kafka. Optionally add a list of producer classes to limit the disabling to those classes.
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
# File 'lib/deimos/producer.rb', line 16
def disable_producers(*producer_classes, &block)
if producer_classes.any?
_disable_producer_classes(producer_classes, &block)
return
end
if Thread.current[:frk_disable_all_producers] yield
return
end
begin
Thread.current[:frk_disable_all_producers] = true
yield
ensure
Thread.current[:frk_disable_all_producers] = false
end
end
|
.encode(schema:, namespace:, payload:, subject: nil) ⇒ String
131
132
133
134
|
# File 'lib/deimos.rb', line 131
def encode(schema:, namespace:, payload:, subject: nil)
self.schema_backend(schema: schema, namespace: namespace).
encode(payload, topic: subject || "#{namespace}.#{schema}" )
end
|
.generate_key_schemas ⇒ Object
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
# File 'lib/deimos/config/configuration.rb', line 27
def generate_key_schemas
Deimos.karafka_configs.each do |config|
transcoder = config.deserializers[:key]
next unless transcoder.respond_to?(:key_field) &&
transcoder.key_field &&
transcoder.backend.supports_key_schemas?
transcoder.backend = Deimos.schema_backend(schema: config.schema,
namespace: config.namespace,
backend: transcoder.backend_type)
transcoder.backend.generate_key_schema(transcoder.key_field)
end
end
|
.karafka_config_for(topic: nil, producer: nil) ⇒ Karafka::Routing::Topic?
225
226
227
228
229
230
231
|
# File 'lib/deimos.rb', line 225
def karafka_config_for(topic: nil, producer: nil)
if topic
karafka_configs.find { |t| t.name == topic }
elsif producer
karafka_configs.find { |t| t.producer_classes&.include?(producer) }
end
end
|
.karafka_configs ⇒ Array<Karafka::Routing::Topic]
Returns Array<Karafka::Routing::Topic].
219
220
221
|
# File 'lib/deimos.rb', line 219
def karafka_configs
Karafka::App.routes.flat_map(&:topics).flat_map(&:to_a)
end
|
.load_generated_schema_classes ⇒ void
This method returns an undefined value.
Loads generated classes
44
45
46
47
48
49
50
51
52
53
54
55
|
# File 'lib/deimos/config/configuration.rb', line 44
def load_generated_schema_classes
if Deimos.config.schema.generated_class_path.nil?
raise 'Cannot use schema classes without schema.generated_class_path. ' \
'Please provide a directory.'
end
Dir["./#{Deimos.config.schema.generated_class_path}/**/*.rb"].
each { |f| require f }
rescue LoadError
raise 'Cannot load schema classes. Please regenerate classes with' \
'rake deimos:generate_schema_models.'
end
|
.producer_for(topic) ⇒ Object
239
240
241
|
# File 'lib/deimos.rb', line 239
def producer_for(topic)
@producers[topic] || Karafka.producer
end
|
.producers_disabled?(producer_class = nil) ⇒ Boolean
Are producers disabled? If a class is passed in, check only that class. Otherwise check if the global disable flag is set.
52
53
54
55
56
57
|
# File 'lib/deimos/producer.rb', line 52
def producers_disabled?(producer_class=nil)
return true if Deimos.config.producers.disabled
Thread.current[:frk_disable_all_producers] ||
Thread.current[:frk_disabled_producers]&.include?(producer_class)
end
|
.schema_backend(schema:, namespace:, registry_info: nil, backend: Deimos.config.schema.backend) ⇒ Deimos::SchemaBackends::Base
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
# File 'lib/deimos.rb', line 98
def schema_backend(schema:, namespace:,
registry_info: nil,
backend: Deimos.config.schema.backend)
if config.schema.use_schema_classes
schema_class = Utils::SchemaClass.klass(schema, namespace)
if schema_class.nil?
schema_backend_class(backend: backend).
new(
schema: schema,
namespace: namespace,
registry_info: registry_info
)
else
schema_instance = schema_class.allocate
schema_backend_class(backend: backend).
new(schema: schema_instance.schema,
namespace: schema_instance.namespace,
registry_info: registry_info)
end
else
schema_backend_class(backend: backend).
new(schema: schema, namespace: namespace, registry_info: registry_info)
end
end
|
62
63
64
65
66
67
68
69
70
71
72
73
|
# File 'lib/deimos.rb', line 62
def schema_backend_class(backend: nil)
backend ||= Deimos.config.schema.backend
require "deimos/schema_backends/#{backend}"
klass = "Deimos::SchemaBackends::#{backend.to_s.classify}".constantize
if self.mock_backends
require "deimos/schema_backends/#{klass.mock_backend}"
klass = "Deimos::SchemaBackends::#{klass.mock_backend.to_s.classify}".constantize
end
klass
end
|
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
|
# File 'lib/deimos.rb', line 77
def schema_backend_for(topic_name)
config = Deimos.karafka_config_for(topic: topic_name)
registry_info = if config.registry_url
Deimos::RegistryInfo.new(
config.registry_url,
config.registry_user,
config.registry_password
)
else
nil
end
self.schema_backend(schema: config.schema,
namespace: config.namespace,
registry_info: registry_info,
backend: config.schema_backend || Deimos.config.schema.backend)
end
|
.setup_karafka ⇒ Object
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
|
# File 'lib/deimos.rb', line 196
def setup_karafka
setup_producers
waterdrop_producers.each do |producer|
producer.middleware.append(Deimos::ProducerMiddleware)
producer.monitor.subscribe(ProducerMetricsListener.new)
producer.monitor.subscribe('error.occurred') do |event|
if event.payload.key?(:messages)
topic = event[:messages].first[:topic]
config = Deimos.karafka_config_for(topic: topic)
message = Deimos::Logging.messages_log_text(config&.payload_log, event[:messages])
json = begin
message.to_json
rescue StandardError
message.to_s
end
Karafka.logger.error("Error producing messages: #{event[:error].message} #{json}")
end
end
end
EVENT_TYPES.each { |type| Karafka.monitor.notifications_bus.register_event(type) }
end
|
.setup_producers ⇒ Object
183
184
185
186
187
188
189
190
191
192
193
194
|
# File 'lib/deimos.rb', line 183
def setup_producers
@producers = {}
producers_by_broker = {}
Deimos.karafka_configs.each do |topic|
broker = topic.kafka[:'bootstrap.servers']
producers_by_broker[broker] ||= ::WaterDrop::Producer.new do |p_config|
config_hash = Karafka::Setup::Config.config.kafka.merge(topic.kafka)
p_config.kafka = Karafka::Setup::AttributesMap.producer(config_hash)
end
@producers[topic.name] = producers_by_broker[broker]
end
end
|
.start_outbox_backend!(thread_count: 1) ⇒ void
This method returns an undefined value.
Start the DB producers to send Kafka messages.
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
|
# File 'lib/deimos.rb', line 162
def start_outbox_backend!(thread_count: 1)
Sigurd.exit_on_signal = true
if self.config.producers.backend != :outbox
raise('Publish backend is not set to :outbox, exiting')
end
if thread_count.nil? || thread_count.zero?
raise('Thread count is not given or set to zero, exiting')
end
producers = (1..thread_count).map do
Deimos::Utils::OutboxProducer.
new(self.config.outbox.logger || Karafka.logger)
end
executor = Sigurd::Executor.new(producers,
sleep_seconds: 5,
logger: Karafka.logger)
signal_handler = Sigurd::SignalHandler.new(executor)
signal_handler.run!
end
|
.topic_for_consumer(handler_class) ⇒ String?
244
245
246
247
248
249
250
251
|
# File 'lib/deimos.rb', line 244
def topic_for_consumer(handler_class)
Deimos.karafka_configs.each do |topic|
if topic.consumer == handler_class
return topic.name
end
end
nil
end
|
.waterdrop_producers ⇒ Array<::WaterDrop::Producer>
234
235
236
|
# File 'lib/deimos.rb', line 234
def waterdrop_producers
(@producers.values + [Karafka.producer]).uniq
end
|