Class: Rimless::ConsumerApp

Inherits:
Karafka::App
  • Object
show all
Defined in:
lib/rimless/consumer.rb

Overview

The global rimless Apache Kafka consumer application based on the Karafka framework.

rubocop:disable Style/ClassVars – because we just work as a singleton

Constant Summary collapse

@@rimless_initialized =

We track our own initialization with this class variable

false

Class Method Summary collapse

Class Method Details

.configureRimless::ConsumerApp

Allows the user to re-configure the Karafka application if this is needed. (eg. to set some ruby-kafka driver default settings, etc)

Returns:



123
124
125
126
# File 'lib/rimless/consumer.rb', line 123

def configure(&)
  setup(&)
  self
end

.initialize!Rimless::ConsumerApp

Initialize the Karafka framework and our global consumer application with all our conventions/opinions.

Returns:



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/rimless/consumer.rb', line 17

def initialize!
  # When already initialized, skip it
  return self if @@rimless_initialized

  # Initialize all the parts one by one
  initialize_rails!
  initialize_monitors!
  initialize_karafka!
  initialize_logger!
  initialize_code_reload!

  # Load the custom Karafka boot file when it exists, it contains
  # custom configurations and the topic/consumer routing table
  require ::Karafka.boot_file if ::Karafka.boot_file.exist?

  # Set our custom initialization process as completed to
  # skip subsequent calls
  @@rimless_initialized = true
  self
end

.initialize_code_reload!Object

Perform code hot-reloading when we are in Rails and in development mode.



111
112
113
114
115
116
117
# File 'lib/rimless/consumer.rb', line 111

def initialize_code_reload!
  return unless defined?(Rails) && Rails.env.development?

  ::Karafka.monitor.subscribe(::Karafka::CodeReloader.new(
                                *Rails.application.reloaders
                              ))
end

.initialize_karafka!Object

Configure the pure basics on the Karafka application.



79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/rimless/consumer.rb', line 79

def initialize_karafka!
  setup do |config|
    mapper = Rimless::Karafka::PassthroughMapper.new
    config.consumer_mapper = config.topic_mapper = mapper
    config.deserializer = Rimless::Karafka::AvroDeserializer.new
    config.kafka.seed_brokers = Rimless.configuration.kafka_brokers
    config.client_id = Rimless.configuration.client_id
    config.logger = Rimless.logger
    config.backend = :sidekiq
    config.batch_fetching = true
    config.batch_consuming = false
    config.shutdown_timeout = 10
  end
end

.initialize_logger!Object

When we run in development mode, we want to write the logs to file and to stdout.



96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/rimless/consumer.rb', line 96

def initialize_logger!
  # Skip when configured not to extend the logger
  return unless Rimless.configuration.extend_dev_logger

  # Skip when not in development environment or in the server process
  return unless Rimless.env.development? && server?

  $stdout.sync = true
  Rimless.logger.extend(ActiveSupport::Logger.broadcast(
                          ActiveSupport::Logger.new($stdout)
                        ))
end

.initialize_monitors!Object

We like to listen to instrumentation and logging events to allow our users to handle them like they need.



68
69
70
71
72
73
74
75
76
# File 'lib/rimless/consumer.rb', line 68

def initialize_monitors!
  [
    WaterDrop::Instrumentation::StdoutListener.new,
    ::Karafka::Instrumentation::StdoutListener.new,
    ::Karafka::Instrumentation::ProctitleListener.new
  ].each do |listener|
    ::Karafka.monitor.subscribe(listener)
  end
end

.initialize_rails!Object

Check if Rails is available and not already initialized, then initialize it.



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/rimless/consumer.rb', line 40

def initialize_rails!
  rails_env = ::Karafka.root.join('config', 'environment.rb')

  # Stop, when Rails is already initialized
  return if defined? Rails

  # Stop, when there is no Rails at all
  return unless rails_env.exist?

  ENV['RAILS_ENV'] ||= 'development'
  ENV['KARAFKA_ENV'] = ENV.fetch('RAILS_ENV', nil)

  # This is relevant for Karafka server processes, as the +karafka.rb+
  # root file just requires +rimless+ and then we require
  # +karafka-sidekiq-backend+ which in fact requires +sidekiq+ before
  # +rails+ was required. We cannot change the order here, but we can
  # explicitly load the Sidekiq/Rails integration as we know at this
  # point that we should load Rails and we're going to use Sidekiq, too.
  # See: https://bit.ly/3D8ZHj3
  require 'sidekiq/rails'

  require rails_env
  require 'rimless/railtie'
  Rails.application.eager_load!
end

.server?Boolean

Check if we run as the Karafka server (consumer) process or not.

Returns:

  • (Boolean)

    whenever we run as the Karafka server or not



188
189
190
# File 'lib/rimless/consumer.rb', line 188

def server?
  $PROGRAM_NAME.end_with?('karafka') && ARGV.include?('server')
end

.topic_names(parts) ⇒ Array<String>

Build the conventional Apache Kafka topic names from the given parts. This allows various forms like single strings/symbols and a hash in the form of +{ app: [String, Symbol], name: [String, Symbol], names:

Array<String, Symbol>

}+. This allows the maximum of flexibility.

Parameters:

  • parts (String, Symbol, Hash{Symbol => Mixed})

    the topic name parts

Returns:

  • (Array<String>)

    the full topic names



173
174
175
176
177
178
179
180
181
182
183
# File 'lib/rimless/consumer.rb', line 173

def topic_names(parts)
  # We have a single app, but multiple names so we handle them
  if parts.is_a?(Hash) && parts.key?(:names)
    return parts[:names].map do |name|
      Rimless.topic(parts.merge(name: name))
    end
  end

  # We cannot handle the given input
  [Rimless.topic(parts)]
end

.topics(topics = []) { ... } ⇒ Object

Configure the topics-consumer routing table in a lean way.

Examples:

topics({ app: :test_app, name: :admins } => YourConsumer)
topics({ app: :test_app, names: %i[users admins] } => YourConsumer)

Examples:

topics do
  topic('name') do
    consumer CustomConsumer
  end
end

Parameters:

  • topics (Hash{Hash => Class}) (defaults to: [])

    the topic to consumer mapping

Yields:

  • the given block on the routing table



145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/rimless/consumer.rb', line 145

def topics(topics = [], &block)
  consumer_groups.draw do
    consumer_group(Rimless.configuration.client_id) do
      instance_exec(&block) if block_given?

      topics.each do |topic_parts, dest_consumer|
        Rimless.consumer.topic_names(topic_parts).each do |topic_name|
          topic(topic_name) do
            consumer dest_consumer
            worker Rimless::ConsumerJob
            interchanger Rimless::Karafka::Base64Interchanger.new
          end
        end
      end
    end
  end

  self
end