Module: Deimos

Includes:
FigTree
Defined in:
lib/deimos.rb,
lib/deimos/logging.rb,
lib/deimos/message.rb,
lib/deimos/version.rb,
lib/deimos/consumer.rb,
lib/deimos/producer.rb,
lib/deimos/poll_info.rb,
lib/deimos/exceptions.rb,
lib/deimos/transcoder.rb,
lib/deimos/kafka_source.rb,
lib/deimos/metrics/mock.rb,
lib/deimos/test_helpers.rb,
lib/deimos/tracing/mock.rb,
lib/deimos/backends/base.rb,
lib/deimos/backends/test.rb,
lib/deimos/kafka_message.rb,
lib/deimos/shared_config.rb,
lib/deimos/backends/kafka.rb,
lib/deimos/backends/outbox.rb,
lib/deimos/metrics/datadog.rb,
lib/deimos/tracing/datadog.rb,
lib/deimos/utils/db_poller.rb,
lib/deimos/ext/schema_route.rb,
lib/deimos/kafka_topic_info.rb,
lib/deimos/metrics/provider.rb,
lib/deimos/tracing/provider.rb,
lib/deimos/schema_class/base.rb,
lib/deimos/schema_class/enum.rb,
lib/deimos/ext/consumer_route.rb,
lib/deimos/ext/producer_route.rb,
lib/deimos/utils/schema_class.rb,
lib/deimos/schema_class/record.rb,
lib/deimos/backends/kafka_async.rb,
lib/deimos/config/configuration.rb,
lib/deimos/schema_backends/base.rb,
lib/deimos/schema_backends/mock.rb,
lib/deimos/utils/db_poller/base.rb,
lib/deimos/utils/deadlock_retry.rb,
lib/deimos/schema_backends/plain.rb,
lib/deimos/utils/outbox_producer.rb,
lib/deimos/active_record_consumer.rb,
lib/deimos/active_record_producer.rb,
lib/deimos/ext/producer_middleware.rb,
lib/deimos/metrics/minimal_datadog.rb,
lib/generators/deimos/v2_generator.rb,
lib/deimos/consume/batch_consumption.rb,
lib/deimos/schema_backends/avro_base.rb,
lib/deimos/schema_backends/avro_local.rb,
lib/deimos/schema_backends/proto_base.rb,
lib/deimos/utils/db_poller/time_based.rb,
lib/deimos/consume/message_consumption.rb,
lib/deimos/schema_backends/proto_local.rb,
lib/deimos/utils/db_poller/state_based.rb,
lib/deimos/ext/producer_metrics_listener.rb,
lib/generators/deimos/db_poller_generator.rb,
lib/deimos/schema_backends/avro_validation.rb,
lib/deimos/metrics/minimal_datadog_listener.rb,
lib/generators/deimos/schema_class_generator.rb,
lib/deimos/active_record_consume/batch_record.rb,
lib/deimos/active_record_consume/batch_slicer.rb,
lib/deimos/active_record_consume/mass_updater.rb,
lib/generators/deimos/active_record_generator.rb,
lib/deimos/schema_backends/avro_schema_coercer.rb,
lib/generators/deimos/bulk_import_id_generator.rb,
lib/generators/deimos/outbox_backend_generator.rb,
lib/deimos/schema_backends/avro_schema_registry.rb,
lib/deimos/schema_backends/proto_schema_registry.rb,
lib/deimos/active_record_consume/batch_consumption.rb,
lib/deimos/active_record_consume/batch_record_list.rb,
lib/deimos/active_record_consume/message_consumption.rb,
lib/deimos/active_record_consume/schema_model_converter.rb

Overview

Generates a migration for bulk import ID in consumer.

Defined Under Namespace

Modules: ActiveRecordConsume, Backends, Consume, Generators, KafkaSource, Logging, Metrics, ProducerMiddleware, SchemaBackends, SchemaClass, SharedConfig, TestHelpers, Tracing, Utils Classes: ActiveRecordConsumer, ActiveRecordProducer, AvroSchemaCoercer, Consumer, ConsumerRoute, KafkaMessage, KafkaTopicInfo, Message, MissingImplementationError, PollInfo, Producer, ProducerMetricsListener, ProducerRoute, Railtie, SchemaField, SchemaRoute, Transcoder

Constant Summary collapse

EVENT_TYPES =
%w(
  deimos.ar_consumer.consume_batch
  deimos.encode_message
  deimos.batch_consumption.invalid_records
  deimos.batch_consumption.valid_records
  deimos.outbox.produce
).freeze
VERSION =
'2.3.0'

Class Method Summary collapse

Class Method Details

.decode(schema:, namespace:, payload:) ⇒ Hash?



102
103
104
# File 'lib/deimos.rb', line 102

def decode(schema:, namespace:, payload:)
  self.schema_backend(schema: schema, namespace: namespace).decode(payload)
end

.decode_message(message) ⇒ Object



107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/deimos.rb', line 107

def decode_message(message)
  topic = message[:topic]
  if Deimos.config.producers.topic_prefix
    topic = topic.sub(Deimos.config.producers.topic_prefix, '')
  end
  config = karafka_config_for(topic: topic)
  return message unless config

  message[:payload] = config.deserializers[:payload].decode_message_hash(message[:payload])
  if message[:key] && config.deserializers[:key].respond_to?(:decode_message_hash)
    message[:key] = config.deserializers[:key].decode_message_hash(message[:key])
  end
end

.disable_producers(*producer_classes, &block) ⇒ void

This method returns an undefined value.

Run a block without allowing any messages to be produced to Kafka. Optionally add a list of producer classes to limit the disabling to those classes.



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/deimos/producer.rb', line 16

def disable_producers(*producer_classes, &block)
  if producer_classes.any?
    _disable_producer_classes(producer_classes, &block)
    return
  end

  if Thread.current[:frk_disable_all_producers] # nested disable block
    yield
    return
  end

  begin
    Thread.current[:frk_disable_all_producers] = true
    yield
  ensure
    Thread.current[:frk_disable_all_producers] = false
  end
end

.encode(schema:, namespace:, payload:, subject: nil) ⇒ String



93
94
95
96
# File 'lib/deimos.rb', line 93

def encode(schema:, namespace:, payload:, subject: nil)
  self.schema_backend(schema: schema, namespace: namespace).
    encode(payload, topic: subject || "#{namespace}.#{schema}" )
end

.generate_key_schemasObject



23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/deimos/config/configuration.rb', line 23

def generate_key_schemas
  Deimos.karafka_configs.each do |config|
    transcoder = config.deserializers[:key]

    next unless transcoder.respond_to?(:key_field) &&
                transcoder.key_field &&
                transcoder.backend.supports_key_schemas?

    transcoder.backend = Deimos.schema_backend(schema: config.schema,
                                               namespace: config.namespace,
                                               backend: transcoder.backend_type)
    transcoder.backend.generate_key_schema(transcoder.key_field)
  end
end

.karafka_config_for(topic: nil, producer: nil) ⇒ Karafka::Routing::Topic?



182
183
184
185
186
187
188
# File 'lib/deimos.rb', line 182

def karafka_config_for(topic: nil, producer: nil)
  if topic
    karafka_configs.find { |t| t.name == topic }
  elsif producer
    karafka_configs.find { |t| t.producer_classes&.include?(producer) }
  end
end

.karafka_configsArray<Karafka::Routing::Topic]



176
177
178
# File 'lib/deimos.rb', line 176

def karafka_configs
  Karafka::App.routes.flat_map(&:topics).flat_map(&:to_a)
end

.load_generated_schema_classesvoid

This method returns an undefined value.

Loads generated classes



40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/deimos/config/configuration.rb', line 40

def load_generated_schema_classes
  if Deimos.config.schema.generated_class_path.nil?
    raise 'Cannot use schema classes without schema.generated_class_path. ' \
          'Please provide a directory.'
  end

  Dir["./#{Deimos.config.schema.generated_class_path}/**/*.rb"].sort.
    each { |f| require f }
rescue LoadError
  raise 'Cannot load schema classes. Please regenerate classes with' \
        'rake deimos:generate_schema_models.'
end

.producer_for(topic) ⇒ Object



196
197
198
# File 'lib/deimos.rb', line 196

def producer_for(topic)
  @producers[topic] || Karafka.producer
end

.producers_disabled?(producer_class = nil) ⇒ Boolean

Are producers disabled? If a class is passed in, check only that class. Otherwise check if the global disable flag is set.



52
53
54
55
56
57
# File 'lib/deimos/producer.rb', line 52

def producers_disabled?(producer_class=nil)
  return true if Deimos.config.producers.disabled

  Thread.current[:frk_disable_all_producers] ||
    Thread.current[:frk_disabled_producers]&.include?(producer_class)
end

.schema_backend(schema:, namespace:, backend: Deimos.config.schema.backend) ⇒ Class<Deimos::SchemaBackends::Base>



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/deimos.rb', line 70

def schema_backend(schema:, namespace:, backend: Deimos.config.schema.backend)
  if config.schema.use_schema_classes
    # Initialize an instance of the provided schema
    # in the event the schema class is an override, the inherited
    # schema and namespace will be applied
    schema_class = Utils::SchemaClass.klass(schema, namespace)
    if schema_class.nil?
      schema_backend_class(backend: backend).new(schema: schema, namespace: namespace)
    else
      schema_instance = schema_class.allocate
      schema_backend_class(backend: backend).
        new(schema: schema_instance.schema, namespace: schema_instance.namespace)
    end
  else
    schema_backend_class(backend: backend).new(schema: schema, namespace: namespace)
  end
end

.schema_backend_class(backend: nil) ⇒ Class<Deimos::SchemaBackends::Base>



59
60
61
62
63
64
65
# File 'lib/deimos.rb', line 59

def schema_backend_class(backend: nil)
  backend ||= Deimos.config.schema.backend

  require "deimos/schema_backends/#{backend}"

  "Deimos::SchemaBackends::#{backend.to_s.classify}".constantize
end

.setup_karafkaObject



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/deimos.rb', line 158

def setup_karafka
  setup_producers
  waterdrop_producers.each do |producer|
    producer.middleware.append(Deimos::ProducerMiddleware)
    producer.monitor.subscribe(ProducerMetricsListener.new)
    producer.monitor.subscribe('error.occurred') do |event|
      if event.payload.key?(:messages)
        topic = event[:messages].first[:topic]
        config = Deimos.karafka_config_for(topic: topic)
        message = Deimos::Logging.messages_log_text(config&.payload_log, event[:messages])
        Karafka.logger.error("Error producing messages: #{event[:error].message} #{message.to_json}")
      end
    end
  end
  EVENT_TYPES.each { |type| Karafka.monitor.notifications_bus.register_event(type) }
end

.setup_producersObject



145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/deimos.rb', line 145

def setup_producers
  @producers = {}
  producers_by_broker = {}
  Deimos.karafka_configs.each do |topic|
    broker = topic.kafka[:'bootstrap.servers']
    producers_by_broker[broker] ||= ::WaterDrop::Producer.new do |p_config|
      config_hash = Karafka::Setup::Config.config.kafka.merge(topic.kafka)
      p_config.kafka = Karafka::Setup::AttributesMap.producer(config_hash)
    end
    @producers[topic.name] = producers_by_broker[broker]
  end
end

.start_outbox_backend!(thread_count: 1) ⇒ void

This method returns an undefined value.

Start the DB producers to send Kafka messages.



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/deimos.rb', line 124

def start_outbox_backend!(thread_count: 1)
  Sigurd.exit_on_signal = true
  if self.config.producers.backend != :outbox
    raise('Publish backend is not set to :outbox, exiting')
  end

  if thread_count.nil? || thread_count.zero?
    raise('Thread count is not given or set to zero, exiting')
  end

  producers = (1..thread_count).map do
    Deimos::Utils::OutboxProducer.
      new(self.config.outbox.logger || Karafka.logger)
  end
  executor = Sigurd::Executor.new(producers,
                                  sleep_seconds: 5,
                                  logger: Karafka.logger)
  signal_handler = Sigurd::SignalHandler.new(executor)
  signal_handler.run!
end

.topic_for_consumer(handler_class) ⇒ String?



201
202
203
204
205
206
207
208
# File 'lib/deimos.rb', line 201

def topic_for_consumer(handler_class)
  Deimos.karafka_configs.each do |topic|
    if topic.consumer == handler_class
      return topic.name
    end
  end
  nil
end

.waterdrop_producersArray<::WaterDrop::Producer>



191
192
193
# File 'lib/deimos.rb', line 191

def waterdrop_producers
  (@producers.values + [Karafka.producer]).uniq
end