Class: Rdkafka::Config

Inherits:
Object
  • Object
show all
Defined in:
lib/rdkafka/config.rb

Overview

Configuration for a Kafka consumer or producer. You can create an instance and use the consumer and producer methods to create a client. Documentation of the available configuration options is available on https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.

Defined Under Namespace

Classes: ClientCreationError, ConfigError, NoLoggerError

Constant Summary collapse

DEFAULT_CONFIG =

Default config that can be overwritten.

{
  # Request api version so advanced features work
  :"api.version.request" => true
}.freeze
REQUIRED_CONFIG =

Required config that cannot be overwritten.

{
  # Enable log queues so we get callbacks in our own Ruby threads
  :"log.queue" => true
}.freeze

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config_hash = {}) ⇒ Config

Returns a new config with the provided options which are merged with DEFAULT_CONFIG.

Parameters:

  • config_hash (Hash<String,Symbol => String>) (defaults to: {})

    The config options for rdkafka



45
46
47
# File 'lib/rdkafka/config.rb', line 45

def initialize(config_hash = {})
  @config_hash = DEFAULT_CONFIG.merge(config_hash)
end

Class Method Details

.loggerLogger

Returns the current logger, by default this is a logger to stdout.

Returns:

  • (Logger)


14
15
16
# File 'lib/rdkafka/config.rb', line 14

def self.logger
  @@logger
end

.logger=(logger) ⇒ nil

Set the logger that will be used for all logging output by this library.

Parameters:

  • logger (Logger)

    The logger to be used

Returns:

  • (nil)

Raises:



23
24
25
26
# File 'lib/rdkafka/config.rb', line 23

def self.logger=(logger)
  raise NoLoggerError if logger.nil?
  @@logger=logger
end

Instance Method Details

#[](key) ⇒ String?

Get a config option with the specified key

Parameters:

  • key (String)

    The config option's key

Returns:

  • (String, nil)

    The config option or nil if it is not present



64
65
66
# File 'lib/rdkafka/config.rb', line 64

def [](key)
  @config_hash[key]
end

#[]=(key, value) ⇒ nil

Set a config option.

Parameters:

  • key (String)

    The config option's key

  • value (String)

    The config option's value

Returns:

  • (nil)


55
56
57
# File 'lib/rdkafka/config.rb', line 55

def []=(key, value)
  @config_hash[key] = value
end

#consumerConsumer

Create a consumer with this configuration.

Returns:

Raises:



74
75
76
77
78
79
80
# File 'lib/rdkafka/config.rb', line 74

def consumer
  kafka = native_kafka(native_config, :rd_kafka_consumer)
  # Redirect the main queue to the consumer
  Rdkafka::Bindings.rd_kafka_poll_set_consumer(kafka)
  # Return consumer with Kafka client
  Rdkafka::Consumer.new(kafka)
end

#producerProducer

Create a producer with this configuration.

Returns:

Raises:



88
89
90
91
92
93
94
95
# File 'lib/rdkafka/config.rb', line 88

def producer
  # Create Kafka config
  config = native_config
  # Set callback to receive delivery reports on config
  Rdkafka::Bindings.rd_kafka_conf_set_dr_msg_cb(config, Rdkafka::Bindings::DeliveryCallback)
  # Return producer with Kafka client
  Rdkafka::Producer.new(native_kafka(config, :rd_kafka_producer))
end