Module: Racecar
- Defined in:
- lib/racecar/erroneous_state_error.rb,
lib/racecar.rb,
lib/racecar/cli.rb,
lib/racecar/ctl.rb,
lib/racecar/pause.rb,
lib/racecar/config.rb,
lib/racecar/daemon.rb,
lib/racecar/heroku.rb,
lib/racecar/runner.rb,
lib/racecar/datadog.rb,
lib/racecar/message.rb,
lib/racecar/version.rb,
lib/racecar/consumer.rb,
lib/racecar/consumer_set.rb,
lib/racecar/instrumenter.rb,
lib/racecar/parallel_runner.rb,
lib/racecar/null_instrumenter.rb,
lib/racecar/message_delivery_error.rb,
lib/racecar/rails_config_file_loader.rb,
lib/generators/racecar/install_generator.rb,
lib/generators/racecar/consumer_generator.rb
Overview
‘rd_kafka_offsets_store()` (et.al) returns an error for any partition that is not currently assigned (through `rd_kafka_*assign()`). This prevents a race condition where an application would store offsets after the assigned partitions had been revoked (which resets the stored offset), that could cause these old stored offsets to be committed later when the same partitions were assigned to this consumer again - effectively overwriting any committed offsets by any consumers that were assigned the same partitions previously. This would typically result in the offsets rewinding and messages to be reprocessed. As an extra effort to avoid this situation the stored offset is now also reset when partitions are assigned (through `rd_kafka_*assign()`).
Defined Under Namespace
Modules: Datadog, Generators, Heroku, RailsConfigFileLoader
Classes: Cli, Config, ConfigError, Consumer, ConsumerSet, Ctl, Daemon, ErroneousStateError, Error, Instrumenter, Message, MessageDeliveryError, NullInstrumenter, ParallelRunner, Pause, Runner
Constant Summary
collapse
- VERSION =
"2.8.1"
Class Method Summary
collapse
Class Method Details
.config ⇒ Object
22
23
24
|
# File 'lib/racecar.rb', line 22
def self.config
@config ||= Config.new
end
|
.config=(config) ⇒ Object
26
27
28
|
# File 'lib/racecar.rb', line 26
def self.config=(config)
@config = config
end
|
30
31
32
|
# File 'lib/racecar.rb', line 30
def self.configure
yield config
end
|
.instrumenter ⇒ Object
42
43
44
45
46
47
48
49
50
51
52
|
# File 'lib/racecar.rb', line 42
def self.instrumenter
@instrumenter ||= begin
default_payload = { client_id: config.client_id, group_id: config.group_id }
Instrumenter.new(default_payload).tap do |instrumenter|
if instrumenter.backend == NullInstrumenter
logger.warn "ActiveSupport::Notifications not available, instrumentation is disabled"
end
end
end
end
|
.logger ⇒ Object
34
35
36
|
# File 'lib/racecar.rb', line 34
def self.logger
config.logger
end
|
.logger=(logger) ⇒ Object
38
39
40
|
# File 'lib/racecar.rb', line 38
def self.logger=(logger)
config.logger = logger
end
|
.run(processor) ⇒ Object
54
55
56
57
58
59
60
61
62
|
# File 'lib/racecar.rb', line 54
def self.run(processor)
runner = Runner.new(processor, config: config, logger: logger, instrumenter: instrumenter)
if config.parallel_workers && config.parallel_workers > 1
ParallelRunner.new(runner: runner, config: config, logger: logger).run
else
runner.run
end
end
|