Class: Rimless::ConsumerApp
- Inherits:
-
Karafka::App
- Object
- Karafka::App
- Rimless::ConsumerApp
- Defined in:
- lib/rimless/consumer.rb
Overview
The global rimless Apache Kafka consumer application based on the Karafka framework.
rubocop:disable Style/ClassVars – because we just work as a singleton
Constant Summary collapse
- @@rimless_initialized =
We track our own initialization with this class variable
false
Class Method Summary collapse
-
.configure ⇒ Rimless::ConsumerApp
Allows the user to re-configure the Karafka application if this is needed.
-
.initialize! ⇒ Rimless::ConsumerApp
Initialize the Karafka framework and our global consumer application with all our conventions/opinions.
-
.initialize_code_reload! ⇒ Object
Perform code hot-reloading when we are in Rails and in development mode.
-
.initialize_karafka! ⇒ Object
Configure the pure basics on the Karafka application.
-
.initialize_logger! ⇒ Object
When we run in development mode, we want to write the logs to file and to stdout.
-
.initialize_monitors! ⇒ Object
We like to listen to instrumentation and logging events to allow our users to handle them like they need.
-
.initialize_rails! ⇒ Object
Check if Rails is available and not already initialized, then initialize it.
-
.server? ⇒ Boolean
Check if we run as the Karafka server (consumer) process or not.
-
.topic_names(parts) ⇒ Array<String>
Build the conventional Apache Kafka topic names from the given parts.
-
.topics(topics = []) { ... } ⇒ Object
Configure the topics-consumer routing table in a lean way.
Class Method Details
.configure ⇒ Rimless::ConsumerApp
Allows the user to re-configure the Karafka application if this is needed. (eg. to set some ruby-kafka driver default settings, etc)
123 124 125 126 |
# File 'lib/rimless/consumer.rb', line 123 def configure(&) setup(&) self end |
.initialize! ⇒ Rimless::ConsumerApp
Initialize the Karafka framework and our global consumer application with all our conventions/opinions.
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/rimless/consumer.rb', line 17 def initialize! # When already initialized, skip it return self if @@rimless_initialized # Initialize all the parts one by one initialize_rails! initialize_monitors! initialize_karafka! initialize_logger! initialize_code_reload! # Load the custom Karafka boot file when it exists, it contains # custom configurations and the topic/consumer routing table require ::Karafka.boot_file if ::Karafka.boot_file.exist? # Set our custom initialization process as completed to # skip subsequent calls @@rimless_initialized = true self end |
.initialize_code_reload! ⇒ Object
Perform code hot-reloading when we are in Rails and in development mode.
111 112 113 114 115 116 117 |
# File 'lib/rimless/consumer.rb', line 111 def initialize_code_reload! return unless defined?(Rails) && Rails.env.development? ::Karafka.monitor.subscribe(::Karafka::CodeReloader.new( *Rails.application.reloaders )) end |
.initialize_karafka! ⇒ Object
Configure the pure basics on the Karafka application.
79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/rimless/consumer.rb', line 79 def initialize_karafka! setup do |config| mapper = Rimless::Karafka::PassthroughMapper.new config.consumer_mapper = config.topic_mapper = mapper config.deserializer = Rimless::Karafka::AvroDeserializer.new config.kafka.seed_brokers = Rimless.configuration.kafka_brokers config.client_id = Rimless.configuration.client_id config.logger = Rimless.logger config.backend = :sidekiq config.batch_fetching = true config.batch_consuming = false config.shutdown_timeout = 10 end end |
.initialize_logger! ⇒ Object
When we run in development mode, we want to write the logs to file and to stdout.
96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/rimless/consumer.rb', line 96 def initialize_logger! # Skip when configured not to extend the logger return unless Rimless.configuration.extend_dev_logger # Skip when not in development environment or in the server process return unless Rimless.env.development? && server? $stdout.sync = true Rimless.logger.extend(ActiveSupport::Logger.broadcast( ActiveSupport::Logger.new($stdout) )) end |
.initialize_monitors! ⇒ Object
We like to listen to instrumentation and logging events to allow our users to handle them like they need.
68 69 70 71 72 73 74 75 76 |
# File 'lib/rimless/consumer.rb', line 68 def initialize_monitors! [ WaterDrop::Instrumentation::StdoutListener.new, ::Karafka::Instrumentation::StdoutListener.new, ::Karafka::Instrumentation::ProctitleListener.new ].each do |listener| ::Karafka.monitor.subscribe(listener) end end |
.initialize_rails! ⇒ Object
Check if Rails is available and not already initialized, then initialize it.
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/rimless/consumer.rb', line 40 def initialize_rails! rails_env = ::Karafka.root.join('config', 'environment.rb') # Stop, when Rails is already initialized return if defined? Rails # Stop, when there is no Rails at all return unless rails_env.exist? ENV['RAILS_ENV'] ||= 'development' ENV['KARAFKA_ENV'] = ENV.fetch('RAILS_ENV', nil) # This is relevant for Karafka server processes, as the +karafka.rb+ # root file just requires +rimless+ and then we require # +karafka-sidekiq-backend+ which in fact requires +sidekiq+ before # +rails+ was required. We cannot change the order here, but we can # explicitly load the Sidekiq/Rails integration as we know at this # point that we should load Rails and we're going to use Sidekiq, too. # See: https://bit.ly/3D8ZHj3 require 'sidekiq/rails' require rails_env require 'rimless/railtie' Rails.application.eager_load! end |
.server? ⇒ Boolean
Check if we run as the Karafka server (consumer) process or not.
188 189 190 |
# File 'lib/rimless/consumer.rb', line 188 def server? $PROGRAM_NAME.end_with?('karafka') && ARGV.include?('server') end |
.topic_names(parts) ⇒ Array<String>
Build the conventional Apache Kafka topic names from the given parts. This allows various forms like single strings/symbols and a hash in the form of +{ app: [String, Symbol], name: [String, Symbol], names:
- Array<String, Symbol>
-
}+. This allows the maximum of flexibility.
173 174 175 176 177 178 179 180 181 182 183 |
# File 'lib/rimless/consumer.rb', line 173 def topic_names(parts) # We have a single app, but multiple names so we handle them if parts.is_a?(Hash) && parts.key?(:names) return parts[:names].map do |name| Rimless.topic(parts.merge(name: name)) end end # We cannot handle the given input [Rimless.topic(parts)] end |
.topics(topics = []) { ... } ⇒ Object
Configure the topics-consumer routing table in a lean way.
Examples:
topics({ app: :test_app, name: :admins } => YourConsumer)
topics({ app: :test_app, names: %i[users admins] } => YourConsumer)
Examples:
topics do
topic('name') do
consumer CustomConsumer
end
end
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/rimless/consumer.rb', line 145 def topics(topics = [], &block) consumer_groups.draw do consumer_group(Rimless.configuration.client_id) do instance_exec(&block) if block_given? topics.each do |topic_parts, dest_consumer| Rimless.consumer.topic_names(topic_parts).each do |topic_name| topic(topic_name) do consumer dest_consumer worker Rimless::ConsumerJob interchanger Rimless::Karafka::Base64Interchanger.new end end end end end self end |