Module: Deimos

Includes:
Instrumentation
Defined in:
lib/deimos.rb,
lib/deimos/message.rb,
lib/deimos/version.rb,
lib/deimos/consumer.rb,
lib/deimos/producer.rb,
lib/deimos/backends/db.rb,
lib/deimos/kafka_source.rb,
lib/deimos/metrics/mock.rb,
lib/deimos/test_helpers.rb,
lib/deimos/tracing/mock.rb,
lib/deimos/configuration.rb,
lib/deimos/kafka_message.rb,
lib/deimos/shared_config.rb,
lib/deimos/backends/kafka.rb,
lib/deimos/schema_coercer.rb,
lib/deimos/utils/executor.rb,
lib/deimos/avro_data_coder.rb,
lib/deimos/instrumentation.rb,
lib/deimos/metrics/datadog.rb,
lib/deimos/publish_backend.rb,
lib/deimos/tracing/datadog.rb,
lib/deimos/kafka_topic_info.rb,
lib/deimos/metrics/provider.rb,
lib/deimos/tracing/provider.rb,
lib/deimos/avro_data_decoder.rb,
lib/deimos/avro_data_encoder.rb,
lib/deimos/utils/db_producer.rb,
lib/deimos/utils/lag_reporter.rb,
lib/deimos/backends/kafka_async.rb,
lib/deimos/utils/signal_handler.rb,
lib/deimos/utils/inline_consumer.rb,
lib/deimos/active_record_consumer.rb,
lib/deimos/active_record_producer.rb,
lib/generators/deimos/db_backend_generator.rb

Overview

Class to consume messages. Can be used with integration testing frameworks. Assumes that you have a topic with only one partition.

Defined Under Namespace

Modules: Backends, Generators, Instrumentation, KafkaListener, KafkaSource, Metrics, SharedConfig, TestHelpers, Tracing, Utils Classes: ActiveRecordConsumer, ActiveRecordProducer, AvroDataCoder, AvroDataDecoder, AvroDataEncoder, Configuration, Consumer, KafkaMessage, KafkaTopicInfo, Message, Producer, PublishBackend, Railtie, SchemaCoercer

Constant Summary collapse

VERSION =
'1.0.0'

Constants included from Instrumentation

Instrumentation::NAMESPACE

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.configObject

Returns the value of attribute config.



42
43
44
# File 'lib/deimos.rb', line 42

def config
  @config
end

Class Method Details

._disable_producer_classes(producer_classes) ⇒ Object

:nodoc:



37
38
39
40
41
42
43
44
# File 'lib/deimos/producer.rb', line 37

def _disable_producer_classes(producer_classes)
  Thread.current[:frk_disabled_producers] ||= Set.new
  producers_to_disable = producer_classes -
                         Thread.current[:frk_disabled_producers].to_a
  Thread.current[:frk_disabled_producers] += producers_to_disable
  yield
  Thread.current[:frk_disabled_producers] -= producers_to_disable
end

.configure {|config| ... } ⇒ Object

Configure Deimos.

Yields:



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/deimos.rb', line 45

def configure
  first_time_config = self.config.nil?
  self.config ||= Configuration.new
  old_config = self.config.dup
  yield(config)

  # Don't re-configure Phobos every time
  if first_time_config || config.phobos_config_changed?(old_config)

    file = config.phobos_config_file
    phobos_config = YAML.load(ERB.new(File.read(File.expand_path(file))).result)

    configure_kafka_for_phobos(phobos_config)
    configure_loggers(phobos_config)

    Phobos.configure(phobos_config)
  end

  validate_db_backend if self.config.publish_backend == :db
end

.configure_kafka_for_phobos(phobos_config) ⇒ Object

Parameters:

  • phobos_config (Hash)


100
101
102
103
104
105
106
107
108
109
# File 'lib/deimos.rb', line 100

def configure_kafka_for_phobos(phobos_config)
  if config.ssl_enabled
    %w(ssl_ca_cert ssl_client_cert ssl_client_cert_key).each do |key|
      next if config.send(key).blank?

      phobos_config['kafka'][key] = ssl_var_contents(config.send(key))
    end
  end
  phobos_config['kafka']['seed_brokers'] = config.seed_broker if config.seed_broker
end

.configure_loggers(phobos_config) ⇒ Object

Parameters:

  • phobos_config (Hash)


112
113
114
115
# File 'lib/deimos.rb', line 112

def configure_loggers(phobos_config)
  phobos_config['custom_logger'] = config.phobos_logger
  phobos_config['custom_kafka_logger'] = config.kafka_logger
end

.disable_producers(*producer_classes, &block) ⇒ Object

Run a block without allowing any messages to be produced to Kafka. Optionally add a list of producer classes to limit the disabling to those classes.

Parameters:

  • producer_classes (Array<Class>|Class)


17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/deimos/producer.rb', line 17

def disable_producers(*producer_classes, &block)
  if producer_classes.any?
    _disable_producer_classes(producer_classes, &block)
    return
  end

  if Thread.current[:frk_disable_all_producers] # nested disable block
    yield
    return
  end

  begin
    Thread.current[:frk_disable_all_producers] = true
    yield
  ensure
    Thread.current[:frk_disable_all_producers] = false
  end
end

.producers_disabled?(producer_class = nil) ⇒ Boolean

Are producers disabled? If a class is passed in, check only that class. Otherwise check if the global disable flag is set.

Returns:

  • (Boolean)


49
50
51
52
# File 'lib/deimos/producer.rb', line 49

def producers_disabled?(producer_class=nil)
  Thread.current[:frk_disable_all_producers] ||
    Thread.current[:frk_disabled_producers]&.include?(producer_class)
end

.ssl_var_contents(filename) ⇒ String

Returns the contents of the file.

Parameters:

  • filename (String)

    a file to read, or the contents of the SSL var

Returns:

  • (String)

    the contents of the file



119
120
121
# File 'lib/deimos.rb', line 119

def ssl_var_contents(filename)
  File.exist?(filename) ? File.read(filename) : filename
end

.start_db_backend!(thread_count: 1) ⇒ Object

Start the DB producers to send Kafka messages.

Parameters:

  • thread_count (Integer) (defaults to: 1)

    the number of threads to start.



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/deimos.rb', line 80

def start_db_backend!(thread_count: 1)
  if self.config.publish_backend != :db # rubocop:disable Style/IfUnlessModifier
    raise('Publish backend is not set to :db, exiting')
  end

  if thread_count.nil? || thread_count.zero?
    raise('Thread count is not given or set to zero, exiting')
  end

  producers = (1..thread_count).map do
    Deimos::Utils::DbProducer.new(self.config.logger)
  end
  executor = Deimos::Utils::Executor.new(producers,
                                         sleep_seconds: 5,
                                         logger: self.config.logger)
  signal_handler = Deimos::Utils::SignalHandler.new(executor)
  signal_handler.run!
end

.validate_db_backendObject

Ensure everything is set up correctly for the DB backend.



67
68
69
70
71
72
73
74
75
76
# File 'lib/deimos.rb', line 67

def validate_db_backend
  begin
    require 'activerecord-import'
  rescue LoadError
    raise 'Cannot set publish_backend to :db without activerecord-import! Please add it to your Gemfile.'
  end
  if Phobos.config.producer_hash[:required_acks] != :all
    raise 'Cannot set publish_backend to :db unless required_acks is set to ":all" in phobos.yml!'
  end
end