Module: Racecar

Defined in:
lib/racecar/erroneous_state_error.rb,
lib/racecar.rb,
lib/racecar/cli.rb,
lib/racecar/ctl.rb,
lib/racecar/pause.rb,
lib/racecar/config.rb,
lib/racecar/daemon.rb,
lib/racecar/heroku.rb,
lib/racecar/runner.rb,
lib/racecar/datadog.rb,
lib/racecar/message.rb,
lib/racecar/version.rb,
lib/racecar/consumer.rb,
lib/racecar/consumer_set.rb,
lib/racecar/instrumenter.rb,
lib/racecar/parallel_runner.rb,
lib/racecar/null_instrumenter.rb,
lib/racecar/message_delivery_error.rb,
lib/racecar/rails_config_file_loader.rb,
lib/generators/racecar/install_generator.rb,
lib/generators/racecar/consumer_generator.rb

Overview

‘rd_kafka_offsets_store()` (et.al) returns an error for any partition that is not currently assigned (through `rd_kafka_*assign()`). This prevents a race condition where an application would store offsets after the assigned partitions had been revoked (which resets the stored offset), that could cause these old stored offsets to be committed later when the same partitions were assigned to this consumer again - effectively overwriting any committed offsets by any consumers that were assigned the same partitions previously. This would typically result in the offsets rewinding and messages to be reprocessed. As an extra effort to avoid this situation the stored offset is now also reset when partitions are assigned (through `rd_kafka_*assign()`).

Defined Under Namespace

Modules: Datadog, Generators, Heroku, RailsConfigFileLoader Classes: Cli, Config, ConfigError, Consumer, ConsumerSet, Ctl, Daemon, ErroneousStateError, Error, Instrumenter, Message, MessageDeliveryError, NullInstrumenter, ParallelRunner, Pause, Runner

Constant Summary collapse

VERSION =
"2.8.1"

Class Method Summary collapse

Class Method Details

.configObject



22
23
24
# File 'lib/racecar.rb', line 22

def self.config
  @config ||= Config.new
end

.config=(config) ⇒ Object



26
27
28
# File 'lib/racecar.rb', line 26

def self.config=(config)
  @config = config
end

.configure {|config| ... } ⇒ Object

Yields:



30
31
32
# File 'lib/racecar.rb', line 30

def self.configure
  yield config
end

.instrumenterObject



42
43
44
45
46
47
48
49
50
51
52
# File 'lib/racecar.rb', line 42

def self.instrumenter
  @instrumenter ||= begin
    default_payload = { client_id: config.client_id, group_id: config.group_id }

    Instrumenter.new(default_payload).tap do |instrumenter|
      if instrumenter.backend == NullInstrumenter
        logger.warn "ActiveSupport::Notifications not available, instrumentation is disabled"
      end
    end
  end
end

.loggerObject



34
35
36
# File 'lib/racecar.rb', line 34

def self.logger
  config.logger
end

.logger=(logger) ⇒ Object



38
39
40
# File 'lib/racecar.rb', line 38

def self.logger=(logger)
  config.logger = logger
end

.run(processor) ⇒ Object



54
55
56
57
58
59
60
61
62
# File 'lib/racecar.rb', line 54

def self.run(processor)
  runner = Runner.new(processor, config: config, logger: logger, instrumenter: instrumenter)

  if config.parallel_workers && config.parallel_workers > 1
    ParallelRunner.new(runner: runner, config: config, logger: logger).run
  else
    runner.run
  end
end