Class: Racecar::Config

Inherits:
KingKonf::Config
  • Object
show all
Defined in:
lib/racecar/config.rb

Constant Summary collapse

STATISTICS_DISABLED_VALUE =
0

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(env: ENV) ⇒ Config

Returns a new instance of Config.



205
206
207
208
209
210
# File 'lib/racecar/config.rb', line 205

def initialize(env: ENV)
  super(env: env)
  @error_handler = proc {}
  @subscriptions = []
  @logger = Logger.new(STDOUT)
end

Instance Attribute Details

#consumer_classObject

Returns the value of attribute consumer_class.



251
252
253
# File 'lib/racecar/config.rb', line 251

def consumer_class
  @consumer_class
end

#error_handlerObject (readonly)

The error handler must be set directly on the object.



189
190
191
# File 'lib/racecar/config.rb', line 189

def error_handler
  @error_handler
end

#instrumenterObject



273
274
275
276
277
278
279
280
281
282
283
284
285
286
# File 'lib/racecar/config.rb', line 273

def instrumenter
  @instrumenter ||= begin
    default_payload = { client_id: client_id, group_id: group_id }

    if defined?(ActiveSupport::Notifications)
      # ActiveSupport needs `concurrent-ruby` but doesn't `require` it.
      require 'concurrent/utility/monotonic_time'
      Instrumenter.new(backend: ActiveSupport::Notifications, default_payload: default_payload)
    else
      logger.warn "ActiveSupport::Notifications not available, instrumentation is disabled"
      NullInstrumenter
    end
  end
end

#loggerObject

Returns the value of attribute logger.



191
192
193
# File 'lib/racecar/config.rb', line 191

def logger
  @logger
end

#parallel_workersObject

Returns the value of attribute parallel_workers.



191
192
193
# File 'lib/racecar/config.rb', line 191

def parallel_workers
  @parallel_workers
end

#subscriptionsObject

Returns the value of attribute subscriptions.



191
192
193
# File 'lib/racecar/config.rb', line 191

def subscriptions
  @subscriptions
end

Instance Method Details

#inspectObject



212
213
214
215
216
217
# File 'lib/racecar/config.rb', line 212

def inspect
  self.class.variables
    .map(&:name)
    .map {|key| [key, get(key).inspect].join(" = ") }
    .join("\n")
end

#install_liveness_probeObject



289
290
291
# File 'lib/racecar/config.rb', line 289

def install_liveness_probe
  liveness_probe.tap(&:install)
end

#liveness_probeObject



293
294
295
296
297
298
299
300
# File 'lib/racecar/config.rb', line 293

def liveness_probe
  require "active_support/notifications"
  @liveness_probe ||= LivenessProbe.new(
    ActiveSupport::Notifications,
    liveness_probe_file_path,
    liveness_probe_max_interval
  )
end

#load_consumer_class(consumer_class) ⇒ Object



233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
# File 'lib/racecar/config.rb', line 233

def load_consumer_class(consumer_class)
  self.consumer_class = consumer_class
  self.group_id = consumer_class.group_id || self.group_id

  self.group_id ||= [
    # Configurable and optional prefix:
    group_id_prefix,

    # MyFunnyConsumer => my-funny-consumer
    consumer_class.name.gsub(/[a-z][A-Z]/) { |str| "#{str[0]}-#{str[1]}" }.downcase,
  ].compact.join

  self.parallel_workers = consumer_class.parallel_workers
  self.subscriptions = consumer_class.subscriptions
  self.max_wait_time = consumer_class.max_wait_time || self.max_wait_time
  self.fetch_messages = consumer_class.fetch_messages || self.fetch_messages
  self.pidfile ||= "#{group_id}.pid"
end

#max_wait_time_msObject



201
202
203
# File 'lib/racecar/config.rb', line 201

def max_wait_time_ms
  max_wait_time * 1000
end

#on_error(&handler) ⇒ Object



253
254
255
# File 'lib/racecar/config.rb', line 253

def on_error(&handler)
  @error_handler = handler
end

#rdkafka_consumerObject



257
258
259
260
261
262
263
# File 'lib/racecar/config.rb', line 257

def rdkafka_consumer
  consumer_config = consumer.map do |param|
    param.split("=", 2).map(&:strip)
  end.to_h
  consumer_config.merge!(rdkafka_security_config)
  consumer_config
end

#rdkafka_producerObject



265
266
267
268
269
270
271
# File 'lib/racecar/config.rb', line 265

def rdkafka_producer
  producer_config = producer.map do |param|
    param.split("=", 2).map(&:strip)
  end.to_h
  producer_config.merge!(rdkafka_security_config)
  producer_config
end

#statistics_interval_msObject



193
194
195
196
197
198
199
# File 'lib/racecar/config.rb', line 193

def statistics_interval_ms
  if Rdkafka::Config.statistics_callback
    statistics_interval * 1000
  else
    STATISTICS_DISABLED_VALUE
  end
end

#validate!Object



219
220
221
222
223
224
225
226
227
228
229
230
231
# File 'lib/racecar/config.rb', line 219

def validate!
  if brokers.empty?
    raise ConfigError, "`brokers` must not be empty"
  end

  if socket_timeout <= max_wait_time
    raise ConfigError, "`socket_timeout` must be longer than `max_wait_time`"
  end

  if max_pause_timeout && !pause_with_exponential_backoff?
    raise ConfigError, "`max_pause_timeout` only makes sense when `pause_with_exponential_backoff` is enabled"
  end
end