Class: Rimless::ConsumerApp
- Inherits:
-
Karafka::App
- Object
- Karafka::App
- Rimless::ConsumerApp
- Defined in:
- lib/rimless/consumer.rb
Overview
The global rimless Apache Kafka consumer application based on the Karafka framework.
rubocop:disable Style/ClassVars – because we just work as a singleton
Constant Summary collapse
- @@rimless_initialized =
We track our own initialization with this class variable
false
Class Method Summary collapse
-
.configure ⇒ Rimless::ConsumerApp
Allows the user to re-configure the Karafka application if this is needed.
-
.initialize! ⇒ Rimless::ConsumerApp
Initialize the Karafka framework and our global consumer application with all our conventions/opinions.
-
.initialize_code_reload! ⇒ Object
Perform code hot-reloading when we are in Rails and in development mode.
-
.initialize_karafka! ⇒ Object
Configure the pure basics on the Karafka application.
-
.initialize_logger! ⇒ Object
When we run in development mode, we want to write the logs to file and to stdout.
-
.initialize_monitors! ⇒ Object
We like to listen to instrumentation and logging events to allow our users to handle them like they need.
-
.initialize_rails! ⇒ Object
Check if Rails is available and not already initialized, then initialize it.
-
.server? ⇒ Boolean
Check if we run as the Karafka server (consumer) process or not.
-
.topic_names(parts) ⇒ Array<String>
Build the conventional Apache Kafka topic names from the given parts.
-
.topics(topics = []) { ... } ⇒ Object
Configure the topics-consumer routing table in a lean way.
Class Method Details
.configure ⇒ Rimless::ConsumerApp
Allows the user to re-configure the Karafka application if this is needed. (eg. to set some ruby-kafka driver default settings, etc)
128 129 130 131 |
# File 'lib/rimless/consumer.rb', line 128 def configure(&) setup(&) self end |
.initialize! ⇒ Rimless::ConsumerApp
Initialize the Karafka framework and our global consumer application with all our conventions/opinions.
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/rimless/consumer.rb', line 17 def initialize! # When already initialized, skip it return self if @@rimless_initialized # Initialize all the parts one by one initialize_rails! initialize_monitors! initialize_karafka! initialize_logger! initialize_code_reload! # Load the custom Karafka boot file when it exists, it contains # custom configurations and the topic/consumer routing table require ::Karafka.boot_file if ::Karafka.boot_file.exist? # Set our custom initialization process as completed to # skip subsequent calls @@rimless_initialized = true self end |
.initialize_code_reload! ⇒ Object
Perform code hot-reloading when we are in Rails and in development mode.
116 117 118 119 120 121 122 |
# File 'lib/rimless/consumer.rb', line 116 def initialize_code_reload! return unless defined?(Rails) && Rails.env.development? ::Karafka.monitor.subscribe(::Karafka::CodeReloader.new( *Rails.application.reloaders )) end |
.initialize_karafka! ⇒ Object
Configure the pure basics on the Karafka application.
rubocop:disable Metrics/MethodLength – because of the various settings rubocop:disable Metrics/AbcSize – ditto
82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/rimless/consumer.rb', line 82 def initialize_karafka! setup do |config| mapper = Rimless::Karafka::PassthroughMapper.new config.consumer_mapper = config.topic_mapper = mapper config.deserializer = Rimless::Karafka::AvroDeserializer.new config.kafka.seed_brokers = Rimless.configuration.kafka_brokers config.client_id = Rimless.configuration.client_id config.logger = Rimless.logger config.backend = :sidekiq config.batch_fetching = true config.batch_consuming = false config.shutdown_timeout = 10 end end |
.initialize_logger! ⇒ Object
When we run in development mode, we want to write the logs to file and to stdout.
101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/rimless/consumer.rb', line 101 def initialize_logger! # Skip when configured not to extend the logger return unless Rimless.configuration.extend_dev_logger # Skip when not in development environment or in the server process return unless Rimless.env.development? && server? $stdout.sync = true Rimless.logger.extend(ActiveSupport::Logger.broadcast( ActiveSupport::Logger.new($stdout) )) end |
.initialize_monitors! ⇒ Object
We like to listen to instrumentation and logging events to allow our users to handle them like they need.
68 69 70 71 72 73 74 75 76 |
# File 'lib/rimless/consumer.rb', line 68 def initialize_monitors! [ WaterDrop::Instrumentation::StdoutListener.new, ::Karafka::Instrumentation::StdoutListener.new, ::Karafka::Instrumentation::ProctitleListener.new ].each do |listener| ::Karafka.monitor.subscribe(listener) end end |
.initialize_rails! ⇒ Object
Check if Rails is available and not already initialized, then initialize it.
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/rimless/consumer.rb', line 40 def initialize_rails! rails_env = ::Karafka.root.join('config', 'environment.rb') # Stop, when Rails is already initialized return if defined? Rails # Stop, when there is no Rails at all return unless rails_env.exist? ENV['RAILS_ENV'] ||= 'development' ENV['KARAFKA_ENV'] = ENV.fetch('RAILS_ENV', nil) # This is relevant for Karafka server processes, as the +karafka.rb+ # root file just requires +rimless+ and then we require # +karafka-sidekiq-backend+ which in fact requires +sidekiq+ before # +rails+ was required. We cannot change the order here, but we can # explicitly load the Sidekiq/Rails integration as we know at this # point that we should load Rails and we're going to use Sidekiq, too. # See: https://bit.ly/3D8ZHj3 require 'sidekiq/rails' require rails_env require 'rimless/railtie' Rails.application.eager_load! end |
.server? ⇒ Boolean
Check if we run as the Karafka server (consumer) process or not.
196 197 198 |
# File 'lib/rimless/consumer.rb', line 196 def server? $PROGRAM_NAME.end_with?('karafka') && ARGV.include?('server') end |
.topic_names(parts) ⇒ Array<String>
Build the conventional Apache Kafka topic names from the given parts. This allows various forms like single strings/symbols and a hash in the form of +{ app: [String, Symbol], name: [String, Symbol], names:
- Array<String, Symbol>
-
}+. This allows the maximum of flexibility.
181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/rimless/consumer.rb', line 181 def topic_names(parts) # We have a single app, but multiple names so we handle them if parts.is_a?(Hash) && parts.key?(:names) return parts[:names].map do |name| Rimless.topic(parts.merge(name: name)) end end # We cannot handle the given input [Rimless.topic(parts)] end |
.topics(topics = []) { ... } ⇒ Object
Configure the topics-consumer routing table in a lean way.
Examples:
topics({ app: :test_app, name: :admins } => YourConsumer)
topics({ app: :test_app, names: %i[users admins] } => YourConsumer)
Examples:
topics do
topic('name') do
consumer CustomConsumer
end
end
rubocop:disable Metrics/MethodLength – because of the Karafka DSL
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
# File 'lib/rimless/consumer.rb', line 152 def topics(topics = [], &block) consumer_groups.draw do consumer_group(Rimless.configuration.client_id) do instance_exec(&block) if block_given? topics.each do |topic_parts, dest_consumer| Rimless.consumer.topic_names(topic_parts).each do |topic_name| topic(topic_name) do consumer dest_consumer worker Rimless::ConsumerJob interchanger Rimless::Karafka::Base64Interchanger.new end end end end end self end |