Class: Rdkafka::Config
- Inherits:
-
Object
- Object
- Rdkafka::Config
- Defined in:
- lib/rdkafka/config.rb
Overview
Configuration for a Kafka consumer or producer. You can create an instance and use the consumer and producer methods to create a client. Documentation of the available configuration options is available on https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.
Defined Under Namespace
Classes: ClientCreationError, ConfigError, NoLoggerError
Constant Summary collapse
- DEFAULT_CONFIG =
Default config that can be overwritten.
{ # Request api version so advanced features work :"api.version.request" => true }.freeze
- REQUIRED_CONFIG =
Required config that cannot be overwritten.
{ # Enable log queues so we get callbacks in our own Ruby threads :"log.queue" => true }.freeze
Class Method Summary collapse
-
.error_callback ⇒ Proc?
Returns the current error callback, by default this is nil.
-
.error_callback=(callback) ⇒ nil
Set a callback that will be called every time the underlying client emits an error.
-
.log_queue ⇒ Queue
Returns a queue whose contents will be passed to the configured logger.
-
.logger ⇒ Logger
Returns the current logger, by default this is a logger to stdout.
-
.logger=(logger) ⇒ nil
Set the logger that will be used for all logging output by this library.
-
.statistics_callback ⇒ Proc?
Returns the current statistics callback, by default this is nil.
-
.statistics_callback=(callback) ⇒ nil
Set a callback that will be called every time the underlying client emits statistics.
Instance Method Summary collapse
-
#[](key) ⇒ String?
Get a config option with the specified key.
-
#[]=(key, value) ⇒ nil
Set a config option.
-
#admin ⇒ Admin
Create an admin instance with this configuration.
-
#consumer ⇒ Consumer
Create a consumer with this configuration.
-
#consumer_rebalance_listener=(listener) ⇒ Object
Get notifications on partition assignment/revocation for the subscribed topics.
-
#initialize(config_hash = {}) ⇒ Config
constructor
Returns a new config with the provided options which are merged with DEFAULT_CONFIG.
-
#producer ⇒ Producer
Create a producer with this configuration.
Constructor Details
#initialize(config_hash = {}) ⇒ Config
Returns a new config with the provided options which are merged with DEFAULT_CONFIG.
113 114 115 116 |
# File 'lib/rdkafka/config.rb', line 113 def initialize(config_hash = {}) @config_hash = DEFAULT_CONFIG.merge(config_hash) @consumer_rebalance_listener = nil end |
Class Method Details
.error_callback ⇒ Proc?
Returns the current error callback, by default this is nil.
87 88 89 |
# File 'lib/rdkafka/config.rb', line 87 def self.error_callback @@error_callback end |
.error_callback=(callback) ⇒ nil
Set a callback that will be called every time the underlying client emits an error. If this callback is not set, global errors such as brokers becoming unavailable will only be sent to the logger, as defined by librdkafka. The callback is called with an instance of RdKafka::Error.
79 80 81 82 |
# File 'lib/rdkafka/config.rb', line 79 def self.error_callback=(callback) raise TypeError.new("Callback has to be callable") unless callback.respond_to?(:call) @@error_callback = callback end |
.log_queue ⇒ Queue
Returns a queue whose contents will be passed to the configured logger. Each entry should follow the format [Logger::Severity, String]. The benefit over calling the logger directly is that this is safe to use from trap contexts.
39 40 41 |
# File 'lib/rdkafka/config.rb', line 39 def self.log_queue @@log_queue end |
.logger ⇒ Logger
Returns the current logger, by default this is a logger to stdout.
29 30 31 |
# File 'lib/rdkafka/config.rb', line 29 def self.logger @@logger end |
.logger=(logger) ⇒ nil
Set the logger that will be used for all logging output by this library.
48 49 50 51 |
# File 'lib/rdkafka/config.rb', line 48 def self.logger=(logger) raise NoLoggerError if logger.nil? @@logger=logger end |
.statistics_callback ⇒ Proc?
Returns the current statistics callback, by default this is nil.
68 69 70 |
# File 'lib/rdkafka/config.rb', line 68 def self.statistics_callback @@statistics_callback end |
.statistics_callback=(callback) ⇒ nil
Set a callback that will be called every time the underlying client emits statistics.
You can configure if and how often this happens using statistics.interval.ms
.
The callback is called with a hash that's documented here: https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md
60 61 62 63 |
# File 'lib/rdkafka/config.rb', line 60 def self.statistics_callback=(callback) raise TypeError.new("Callback has to be callable") unless callback.respond_to?(:call) @@statistics_callback = callback end |
Instance Method Details
#[](key) ⇒ String?
Get a config option with the specified key
133 134 135 |
# File 'lib/rdkafka/config.rb', line 133 def [](key) @config_hash[key] end |
#[]=(key, value) ⇒ nil
Set a config option.
124 125 126 |
# File 'lib/rdkafka/config.rb', line 124 def []=(key, value) @config_hash[key] = value end |
#admin ⇒ Admin
Create an admin instance with this configuration.
193 194 195 196 197 198 |
# File 'lib/rdkafka/config.rb', line 193 def admin opaque = Opaque.new config = native_config(opaque) Rdkafka::Bindings.rd_kafka_conf_set_background_event_cb(config, Rdkafka::Callbacks::BackgroundEventCallbackFunction) Rdkafka::Admin.new(native_kafka(config, :rd_kafka_producer)) end |
#consumer ⇒ Consumer
Create a consumer with this configuration.
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/rdkafka/config.rb', line 150 def consumer opaque = Opaque.new config = native_config(opaque) if @consumer_rebalance_listener opaque.consumer_rebalance_listener = @consumer_rebalance_listener Rdkafka::Bindings.rd_kafka_conf_set_rebalance_cb(config, Rdkafka::Bindings::RebalanceCallback) end kafka = native_kafka(config, :rd_kafka_consumer) # Redirect the main queue to the consumer Rdkafka::Bindings.rd_kafka_poll_set_consumer(kafka) # Return consumer with Kafka client Rdkafka::Consumer.new(kafka) end |
#consumer_rebalance_listener=(listener) ⇒ Object
Get notifications on partition assignment/revocation for the subscribed topics
140 141 142 |
# File 'lib/rdkafka/config.rb', line 140 def consumer_rebalance_listener=(listener) @consumer_rebalance_listener = listener end |
#producer ⇒ Producer
Create a producer with this configuration.
174 175 176 177 178 179 180 181 182 183 184 185 |
# File 'lib/rdkafka/config.rb', line 174 def producer # Create opaque opaque = Opaque.new # Create Kafka config config = native_config(opaque) # Set callback to receive delivery reports on config Rdkafka::Bindings.rd_kafka_conf_set_dr_msg_cb(config, Rdkafka::Callbacks::DeliveryCallbackFunction) # Return producer with Kafka client Rdkafka::Producer.new(native_kafka(config, :rd_kafka_producer)).tap do |producer| opaque.producer = producer end end |