Module: Deimos
- Includes:
- FigTree
- Defined in:
- lib/deimos.rb,
lib/deimos/logging.rb,
lib/deimos/message.rb,
lib/deimos/version.rb,
lib/deimos/consumer.rb,
lib/deimos/producer.rb,
lib/deimos/poll_info.rb,
lib/deimos/exceptions.rb,
lib/deimos/transcoder.rb,
lib/deimos/kafka_source.rb,
lib/deimos/metrics/mock.rb,
lib/deimos/test_helpers.rb,
lib/deimos/tracing/mock.rb,
lib/deimos/backends/base.rb,
lib/deimos/backends/test.rb,
lib/deimos/kafka_message.rb,
lib/deimos/shared_config.rb,
lib/deimos/backends/kafka.rb,
lib/deimos/backends/outbox.rb,
lib/deimos/metrics/datadog.rb,
lib/deimos/tracing/datadog.rb,
lib/deimos/utils/db_poller.rb,
lib/deimos/ext/schema_route.rb,
lib/deimos/kafka_topic_info.rb,
lib/deimos/metrics/provider.rb,
lib/deimos/tracing/provider.rb,
lib/deimos/schema_class/base.rb,
lib/deimos/schema_class/enum.rb,
lib/deimos/ext/consumer_route.rb,
lib/deimos/ext/producer_route.rb,
lib/deimos/utils/schema_class.rb,
lib/deimos/schema_class/record.rb,
lib/deimos/backends/kafka_async.rb,
lib/deimos/config/configuration.rb,
lib/deimos/schema_backends/base.rb,
lib/deimos/schema_backends/mock.rb,
lib/deimos/utils/db_poller/base.rb,
lib/deimos/utils/deadlock_retry.rb,
lib/deimos/schema_backends/plain.rb,
lib/deimos/utils/outbox_producer.rb,
lib/deimos/active_record_consumer.rb,
lib/deimos/active_record_producer.rb,
lib/deimos/ext/producer_middleware.rb,
lib/deimos/metrics/minimal_datadog.rb,
lib/generators/deimos/v2_generator.rb,
lib/deimos/consume/batch_consumption.rb,
lib/deimos/schema_backends/avro_base.rb,
lib/deimos/schema_backends/avro_local.rb,
lib/deimos/schema_backends/proto_base.rb,
lib/deimos/utils/db_poller/time_based.rb,
lib/deimos/consume/message_consumption.rb,
lib/deimos/schema_backends/proto_local.rb,
lib/deimos/utils/db_poller/state_based.rb,
lib/deimos/ext/producer_metrics_listener.rb,
lib/generators/deimos/db_poller_generator.rb,
lib/deimos/schema_backends/avro_validation.rb,
lib/deimos/metrics/minimal_datadog_listener.rb,
lib/generators/deimos/schema_class_generator.rb,
lib/deimos/active_record_consume/batch_record.rb,
lib/deimos/active_record_consume/batch_slicer.rb,
lib/deimos/active_record_consume/mass_updater.rb,
lib/generators/deimos/active_record_generator.rb,
lib/deimos/schema_backends/avro_schema_coercer.rb,
lib/generators/deimos/bulk_import_id_generator.rb,
lib/generators/deimos/outbox_backend_generator.rb,
lib/deimos/schema_backends/avro_schema_registry.rb,
lib/deimos/schema_backends/proto_schema_registry.rb,
lib/deimos/active_record_consume/batch_consumption.rb,
lib/deimos/active_record_consume/batch_record_list.rb,
lib/deimos/active_record_consume/message_consumption.rb,
lib/deimos/active_record_consume/schema_model_converter.rb
Overview
Generates a migration for bulk import ID in consumer.
Defined Under Namespace
Modules: ActiveRecordConsume, Backends, Consume, Generators, KafkaSource, Logging, Metrics, ProducerMiddleware, SchemaBackends, SchemaClass, SharedConfig, TestHelpers, Tracing, Utils Classes: ActiveRecordConsumer, ActiveRecordProducer, AvroSchemaCoercer, Consumer, ConsumerRoute, KafkaMessage, KafkaTopicInfo, Message, MissingImplementationError, PollInfo, Producer, ProducerMetricsListener, ProducerRoute, Railtie, SchemaField, SchemaRoute, Transcoder
Constant Summary collapse
- EVENT_TYPES =
%w( deimos.ar_consumer.consume_batch deimos.encode_message deimos.batch_consumption.invalid_records deimos.batch_consumption.valid_records deimos.outbox.produce )
- VERSION =
'2.2.0'
Class Method Summary collapse
- .decode(schema:, namespace:, payload:) ⇒ Hash?
- .decode_message(message) ⇒ Object
-
.disable_producers(*producer_classes, &block) ⇒ void
Run a block without allowing any messages to be produced to Kafka.
- .encode(schema:, namespace:, payload:, subject: nil) ⇒ String
- .generate_key_schemas ⇒ Object
- .karafka_config_for(topic: nil, producer: nil) ⇒ Karafka::Routing::Topic?
-
.karafka_configs ⇒ Array<Karafka::Routing::Topic]
Array<Karafka::Routing::Topic].
-
.load_generated_schema_classes ⇒ void
Loads generated classes.
-
.producers_disabled?(producer_class = nil) ⇒ Boolean
Are producers disabled? If a class is passed in, check only that class.
- .schema_backend(schema:, namespace:, backend: Deimos.config.schema.backend) ⇒ Deimos::SchemaBackends::Base
- .schema_backend_class(backend: nil) ⇒ Class<Deimos::SchemaBackends::Base>
- .setup_karafka ⇒ Object
-
.start_outbox_backend!(thread_count: 1) ⇒ void
Start the DB producers to send Kafka messages.
- .topic_for_consumer(handler_class) ⇒ String?
Class Method Details
.decode(schema:, namespace:, payload:) ⇒ Hash?
102 103 104 |
# File 'lib/deimos.rb', line 102 def decode(schema:, namespace:, payload:) self.schema_backend(schema: schema, namespace: namespace).decode(payload) end |
.decode_message(message) ⇒ Object
107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/deimos.rb', line 107 def () topic = [:topic] if Deimos.config.producers.topic_prefix topic = topic.sub(Deimos.config.producers.topic_prefix, '') end config = karafka_config_for(topic: topic) return unless config [:payload] = config.deserializers[:payload].([:payload]) if [:key] && config.deserializers[:key].respond_to?(:decode_message_hash) [:key] = config.deserializers[:key].([:key]) end end |
.disable_producers(*producer_classes, &block) ⇒ void
This method returns an undefined value.
Run a block without allowing any messages to be produced to Kafka. Optionally add a list of producer classes to limit the disabling to those classes.
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/deimos/producer.rb', line 16 def disable_producers(*producer_classes, &block) if producer_classes.any? _disable_producer_classes(producer_classes, &block) return end if Thread.current[:frk_disable_all_producers] # nested disable block yield return end begin Thread.current[:frk_disable_all_producers] = true yield ensure Thread.current[:frk_disable_all_producers] = false end end |
.encode(schema:, namespace:, payload:, subject: nil) ⇒ String
93 94 95 96 |
# File 'lib/deimos.rb', line 93 def encode(schema:, namespace:, payload:, subject: nil) self.schema_backend(schema: schema, namespace: namespace). encode(payload, topic: subject || "#{namespace}.#{schema}" ) end |
.generate_key_schemas ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/deimos/config/configuration.rb', line 23 def generate_key_schemas Deimos.karafka_configs.each do |config| transcoder = config.deserializers[:key] if transcoder.respond_to?(:key_field) && transcoder.key_field && transcoder.backend.supports_key_schemas? transcoder.backend = Deimos.schema_backend(schema: config.schema, namespace: config.namespace, backend: transcoder.backend_type) transcoder.backend.generate_key_schema(transcoder.key_field) end end end |
.karafka_config_for(topic: nil, producer: nil) ⇒ Karafka::Routing::Topic?
171 172 173 174 175 176 177 |
# File 'lib/deimos.rb', line 171 def karafka_config_for(topic: nil, producer: nil) if topic karafka_configs.find { |t| t.name == topic} elsif producer karafka_configs.find { |t| t.producer_classes&.include?(producer)} end end |
.karafka_configs ⇒ Array<Karafka::Routing::Topic]
Returns Array<Karafka::Routing::Topic].
165 166 167 |
# File 'lib/deimos.rb', line 165 def karafka_configs Karafka::App.routes.flat_map(&:topics).flat_map(&:to_a) end |
.load_generated_schema_classes ⇒ void
This method returns an undefined value.
Loads generated classes
40 41 42 43 44 45 46 47 48 |
# File 'lib/deimos/config/configuration.rb', line 40 def load_generated_schema_classes if Deimos.config.schema.generated_class_path.nil? raise 'Cannot use schema classes without schema.generated_class_path. Please provide a directory.' end Dir["./#{Deimos.config.schema.generated_class_path}/**/*.rb"].sort.each { |f| require f } rescue LoadError raise 'Cannot load schema classes. Please regenerate classes with rake deimos:generate_schema_models.' end |
.producers_disabled?(producer_class = nil) ⇒ Boolean
Are producers disabled? If a class is passed in, check only that class. Otherwise check if the global disable flag is set.
52 53 54 55 56 57 |
# File 'lib/deimos/producer.rb', line 52 def producers_disabled?(producer_class=nil) return true if Deimos.config.producers.disabled Thread.current[:frk_disable_all_producers] || Thread.current[:frk_disabled_producers]&.include?(producer_class) end |
.schema_backend(schema:, namespace:, backend: Deimos.config.schema.backend) ⇒ Deimos::SchemaBackends::Base
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/deimos.rb', line 70 def schema_backend(schema:, namespace:, backend: Deimos.config.schema.backend) if config.schema.use_schema_classes # Initialize an instance of the provided schema # in the event the schema class is an override, the inherited # schema and namespace will be applied schema_class = Utils::SchemaClass.klass(schema, namespace) if schema_class.nil? schema_backend_class(backend: backend).new(schema: schema, namespace: namespace) else schema_instance = schema_class.allocate schema_backend_class(backend: backend). new(schema: schema_instance.schema, namespace: schema_instance.namespace) end else schema_backend_class(backend: backend).new(schema: schema, namespace: namespace) end end |
.schema_backend_class(backend: nil) ⇒ Class<Deimos::SchemaBackends::Base>
59 60 61 62 63 64 65 |
# File 'lib/deimos.rb', line 59 def schema_backend_class(backend: nil) backend ||= Deimos.config.schema.backend require "deimos/schema_backends/#{backend}" "Deimos::SchemaBackends::#{backend.to_s.classify}".constantize end |
.setup_karafka ⇒ Object
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
# File 'lib/deimos.rb', line 145 def setup_karafka Karafka.producer.middleware.append(Deimos::ProducerMiddleware) # for multiple setup calls Karafka.producer.config.kafka = Karafka::Setup::AttributesMap.producer(Karafka::Setup::Config.config.kafka.dup) EVENT_TYPES.each { |type| Karafka.monitor.notifications_bus.register_event(type) } Karafka.producer.monitor.subscribe(ProducerMetricsListener.new) Karafka.producer.monitor.subscribe('error.occurred') do |event| if event.payload.key?(:messages) topic = event[:messages].first[:topic] config = Deimos.karafka_config_for(topic: topic) = Deimos::Logging.(config&.payload_log, event[:messages]) Karafka.logger.error("Error producing messages: #{event[:error].} #{.to_json}") end end end |
.start_outbox_backend!(thread_count: 1) ⇒ void
This method returns an undefined value.
Start the DB producers to send Kafka messages.
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/deimos.rb', line 124 def start_outbox_backend!(thread_count: 1) Sigurd.exit_on_signal = true if self.config.producers.backend != :outbox raise('Publish backend is not set to :outbox, exiting') end if thread_count.nil? || thread_count.zero? raise('Thread count is not given or set to zero, exiting') end producers = (1..thread_count).map do Deimos::Utils::OutboxProducer. new(self.config.outbox.logger || Karafka.logger) end executor = Sigurd::Executor.new(producers, sleep_seconds: 5, logger: Karafka.logger) signal_handler = Sigurd::SignalHandler.new(executor) signal_handler.run! end |
.topic_for_consumer(handler_class) ⇒ String?
181 182 183 184 185 186 187 188 |
# File 'lib/deimos.rb', line 181 def topic_for_consumer(handler_class) Deimos.karafka_configs.each do |topic| if topic.consumer == handler_class return topic.name end end nil end |