Module: Deimos
- Includes:
- Instrumentation
- Defined in:
- lib/deimos.rb,
lib/deimos/message.rb,
lib/deimos/version.rb,
lib/deimos/consumer.rb,
lib/deimos/producer.rb,
lib/deimos/backends/db.rb,
lib/deimos/kafka_source.rb,
lib/deimos/metrics/mock.rb,
lib/deimos/test_helpers.rb,
lib/deimos/tracing/mock.rb,
lib/deimos/configuration.rb,
lib/deimos/kafka_message.rb,
lib/deimos/shared_config.rb,
lib/deimos/backends/kafka.rb,
lib/deimos/schema_coercer.rb,
lib/deimos/utils/executor.rb,
lib/deimos/avro_data_coder.rb,
lib/deimos/instrumentation.rb,
lib/deimos/metrics/datadog.rb,
lib/deimos/publish_backend.rb,
lib/deimos/tracing/datadog.rb,
lib/deimos/kafka_topic_info.rb,
lib/deimos/metrics/provider.rb,
lib/deimos/tracing/provider.rb,
lib/deimos/avro_data_decoder.rb,
lib/deimos/avro_data_encoder.rb,
lib/deimos/utils/db_producer.rb,
lib/deimos/utils/lag_reporter.rb,
lib/deimos/backends/kafka_async.rb,
lib/deimos/utils/signal_handler.rb,
lib/deimos/utils/inline_consumer.rb,
lib/deimos/active_record_consumer.rb,
lib/deimos/active_record_producer.rb,
lib/generators/deimos/db_backend_generator.rb
Overview
Class to consume messages. Can be used with integration testing frameworks. Assumes that you have a topic with only one partition.
Defined Under Namespace
Modules: Backends, Generators, Instrumentation, KafkaListener, KafkaSource, Metrics, SharedConfig, TestHelpers, Tracing, Utils Classes: ActiveRecordConsumer, ActiveRecordProducer, AvroDataCoder, AvroDataDecoder, AvroDataEncoder, Configuration, Consumer, KafkaMessage, KafkaTopicInfo, Message, Producer, PublishBackend, Railtie, SchemaCoercer
Constant Summary collapse
- VERSION =
'1.0.0'
Constants included from Instrumentation
Class Attribute Summary collapse
-
.config ⇒ Object
Returns the value of attribute config.
Class Method Summary collapse
-
._disable_producer_classes(producer_classes) ⇒ Object
:nodoc:.
-
.configure {|config| ... } ⇒ Object
Configure Deimos.
- .configure_kafka_for_phobos(phobos_config) ⇒ Object
- .configure_loggers(phobos_config) ⇒ Object
-
.disable_producers(*producer_classes, &block) ⇒ Object
Run a block without allowing any messages to be produced to Kafka.
-
.producers_disabled?(producer_class = nil) ⇒ Boolean
Are producers disabled? If a class is passed in, check only that class.
-
.ssl_var_contents(filename) ⇒ String
The contents of the file.
-
.start_db_backend!(thread_count: 1) ⇒ Object
Start the DB producers to send Kafka messages.
-
.validate_db_backend ⇒ Object
Ensure everything is set up correctly for the DB backend.
Class Attribute Details
.config ⇒ Object
Returns the value of attribute config.
42 43 44 |
# File 'lib/deimos.rb', line 42 def config @config end |
Class Method Details
._disable_producer_classes(producer_classes) ⇒ Object
:nodoc:
37 38 39 40 41 42 43 44 |
# File 'lib/deimos/producer.rb', line 37 def _disable_producer_classes(producer_classes) Thread.current[:frk_disabled_producers] ||= Set.new producers_to_disable = producer_classes - Thread.current[:frk_disabled_producers].to_a Thread.current[:frk_disabled_producers] += producers_to_disable yield Thread.current[:frk_disabled_producers] -= producers_to_disable end |
.configure {|config| ... } ⇒ Object
Configure Deimos.
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/deimos.rb', line 45 def configure first_time_config = self.config.nil? self.config ||= Configuration.new old_config = self.config.dup yield(config) # Don't re-configure Phobos every time if first_time_config || config.phobos_config_changed?(old_config) file = config.phobos_config_file phobos_config = YAML.load(ERB.new(File.read(File.(file))).result) configure_kafka_for_phobos(phobos_config) configure_loggers(phobos_config) Phobos.configure(phobos_config) end validate_db_backend if self.config.publish_backend == :db end |
.configure_kafka_for_phobos(phobos_config) ⇒ Object
100 101 102 103 104 105 106 107 108 109 |
# File 'lib/deimos.rb', line 100 def configure_kafka_for_phobos(phobos_config) if config.ssl_enabled %w(ssl_ca_cert ssl_client_cert ssl_client_cert_key).each do |key| next if config.send(key).blank? phobos_config['kafka'][key] = ssl_var_contents(config.send(key)) end end phobos_config['kafka']['seed_brokers'] = config.seed_broker if config.seed_broker end |
.configure_loggers(phobos_config) ⇒ Object
112 113 114 115 |
# File 'lib/deimos.rb', line 112 def configure_loggers(phobos_config) phobos_config['custom_logger'] = config.phobos_logger phobos_config['custom_kafka_logger'] = config.kafka_logger end |
.disable_producers(*producer_classes, &block) ⇒ Object
Run a block without allowing any messages to be produced to Kafka. Optionally add a list of producer classes to limit the disabling to those classes.
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/deimos/producer.rb', line 17 def disable_producers(*producer_classes, &block) if producer_classes.any? _disable_producer_classes(producer_classes, &block) return end if Thread.current[:frk_disable_all_producers] # nested disable block yield return end begin Thread.current[:frk_disable_all_producers] = true yield ensure Thread.current[:frk_disable_all_producers] = false end end |
.producers_disabled?(producer_class = nil) ⇒ Boolean
Are producers disabled? If a class is passed in, check only that class. Otherwise check if the global disable flag is set.
49 50 51 52 |
# File 'lib/deimos/producer.rb', line 49 def producers_disabled?(producer_class=nil) Thread.current[:frk_disable_all_producers] || Thread.current[:frk_disabled_producers]&.include?(producer_class) end |
.ssl_var_contents(filename) ⇒ String
Returns the contents of the file.
119 120 121 |
# File 'lib/deimos.rb', line 119 def ssl_var_contents(filename) File.exist?(filename) ? File.read(filename) : filename end |
.start_db_backend!(thread_count: 1) ⇒ Object
Start the DB producers to send Kafka messages.
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/deimos.rb', line 80 def start_db_backend!(thread_count: 1) if self.config.publish_backend != :db # rubocop:disable Style/IfUnlessModifier raise('Publish backend is not set to :db, exiting') end if thread_count.nil? || thread_count.zero? raise('Thread count is not given or set to zero, exiting') end producers = (1..thread_count).map do Deimos::Utils::DbProducer.new(self.config.logger) end executor = Deimos::Utils::Executor.new(producers, sleep_seconds: 5, logger: self.config.logger) signal_handler = Deimos::Utils::SignalHandler.new(executor) signal_handler.run! end |
.validate_db_backend ⇒ Object
Ensure everything is set up correctly for the DB backend.
67 68 69 70 71 72 73 74 75 76 |
# File 'lib/deimos.rb', line 67 def validate_db_backend begin require 'activerecord-import' rescue LoadError raise 'Cannot set publish_backend to :db without activerecord-import! Please add it to your Gemfile.' end if Phobos.config.producer_hash[:required_acks] != :all raise 'Cannot set publish_backend to :db unless required_acks is set to ":all" in phobos.yml!' end end |