Class: Rdkafka::Config

Inherits:
Object
  • Object
show all
Defined in:
lib/rdkafka/config.rb

Overview

Configuration for a Kafka consumer or producer. You can create an instance and use the consumer and producer methods to create a client. Documentation of the available configuration options is available on https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.

Defined Under Namespace

Classes: ClientCreationError, ConfigError, NoLoggerError

Constant Summary collapse

DEFAULT_CONFIG =

Default config that can be overwritten.

{
  # Request api version so advanced features work
  :"api.version.request" => true
}.freeze
REQUIRED_CONFIG =

Required config that cannot be overwritten.

{
  # Enable log queues so we get callbacks in our own Ruby threads
  :"log.queue" => true
}.freeze
@@statistics_callback =
nil

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config_hash = {}) ⇒ Config

Returns a new config with the provided options which are merged with DEFAULT_CONFIG.

Parameters:

  • config_hash (Hash<String,Symbol => String>) (defaults to: {})

    The config options for rdkafka



65
66
67
# File 'lib/rdkafka/config.rb', line 65

def initialize(config_hash = {})
  @config_hash = DEFAULT_CONFIG.merge(config_hash)
end

Class Method Details

.loggerLogger

Returns the current logger, by default this is a logger to stdout.

Returns:

  • (Logger)


15
16
17
# File 'lib/rdkafka/config.rb', line 15

def self.logger
  @@logger
end

.logger=(logger) ⇒ nil

Set the logger that will be used for all logging output by this library.

Parameters:

  • logger (Logger)

    The logger to be used

Returns:

  • (nil)

Raises:



24
25
26
27
# File 'lib/rdkafka/config.rb', line 24

def self.logger=(logger)
  raise NoLoggerError if logger.nil?
  @@logger=logger
end

.statistics_callbackProc?

Returns the current statistics callback, by default this is nil.

Returns:

  • (Proc, nil)


44
45
46
# File 'lib/rdkafka/config.rb', line 44

def self.statistics_callback
  @@statistics_callback
end

.statistics_callback=(callback) ⇒ nil

Set a callback that will be called every time the underlying client emits statistics. You can configure if and how often this happens using statistics.interval.ms. The callback is called with a hash that's documented here: https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md

Parameters:

  • callback (Proc)

    The callback

Returns:

  • (nil)

Raises:

  • (TypeError)


36
37
38
39
# File 'lib/rdkafka/config.rb', line 36

def self.statistics_callback=(callback)
  raise TypeError.new("Callback has to be a proc or lambda") unless callback.is_a? Proc
  @@statistics_callback = callback
end

Instance Method Details

#[](key) ⇒ String?

Get a config option with the specified key

Parameters:

  • key (String)

    The config option's key

Returns:

  • (String, nil)

    The config option or nil if it is not present



84
85
86
# File 'lib/rdkafka/config.rb', line 84

def [](key)
  @config_hash[key]
end

#[]=(key, value) ⇒ nil

Set a config option.

Parameters:

  • key (String)

    The config option's key

  • value (String)

    The config option's value

Returns:

  • (nil)


75
76
77
# File 'lib/rdkafka/config.rb', line 75

def []=(key, value)
  @config_hash[key] = value
end

#consumerConsumer

Create a consumer with this configuration.

Returns:

Raises:



94
95
96
97
98
99
100
# File 'lib/rdkafka/config.rb', line 94

def consumer
  kafka = native_kafka(native_config, :rd_kafka_consumer)
  # Redirect the main queue to the consumer
  Rdkafka::Bindings.rd_kafka_poll_set_consumer(kafka)
  # Return consumer with Kafka client
  Rdkafka::Consumer.new(kafka)
end

#producerProducer

Create a producer with this configuration.

Returns:

Raises:



108
109
110
111
112
113
114
115
# File 'lib/rdkafka/config.rb', line 108

def producer
  # Create Kafka config
  config = native_config
  # Set callback to receive delivery reports on config
  Rdkafka::Bindings.rd_kafka_conf_set_dr_msg_cb(config, Rdkafka::Bindings::DeliveryCallback)
  # Return producer with Kafka client
  Rdkafka::Producer.new(native_kafka(config, :rd_kafka_producer))
end