Class: GlassOctopus::RubyKafkaAdapter

Inherits:
Object
  • Object
show all
Defined in:
lib/glass_octopus/connection/ruby_kafka_adapter.rb

Overview

Connection adapter that uses the ruby-kafka gem to talk to Kafka 0.9+.

Examples:

adapter = GlassOctopus::RubyKafkaAdapter.new do |kafka_config|
  kafka_config.broker_list = %w[localhost:9092]
  kafka_config.topic       = "mytopic"
  kafka_config.group_id    = "mygroup"
end

adapter.connect.fetch_message do |message|
  p message
end

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(logger = nil) { ... } ⇒ RubyKafkaAdapter

Returns a new instance of RubyKafkaAdapter.

Yields:

  • configure ruby-kafka in the yielded block.

    The following configuration values are required:

    • broker_list: list of Kafka broker addresses

    • topic: name of the topic to subscribe to

    • group_id: name of the consumer group

    • client_id: the identifier for this application

    Optional configuration:

    • client: a hash passed on to Kafka.new

    • consumer: a hash passed on to kafka.consumer

    • subscription: a hash passed on to consumer.subscribe

    Check the ruby-kafka documentation for driver specific configurations.

Raises:



44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/glass_octopus/connection/ruby_kafka_adapter.rb', line 44

def initialize(logger=nil)
  config = OpenStruct.new
  yield config

  @options = config.to_h
  @options[:group_id] ||= @options[:group]
  @options[:logger] ||= logger
  validate_options

  @kafka = nil
  @consumer = nil
end

Instance Attribute Details

#optionsHash (readonly)

A hash that hold the configuration set up in the initializer block.

Returns:

  • (Hash)


24
25
26
# File 'lib/glass_octopus/connection/ruby_kafka_adapter.rb', line 24

def options
  @options
end

Instance Method Details

#connectvoid

This method returns an undefined value.

Connect to Kafka and join the consumer group.



59
60
61
62
63
64
65
66
67
68
# File 'lib/glass_octopus/connection/ruby_kafka_adapter.rb', line 59

def connect
  @kafka = connect_to_kafka
  @consumer = create_consumer(@kafka)
  @consumer.subscribe(
    options.fetch(:topic),
    **options.fetch(:subscription, {})
  )

  self
end

#fetch_message {|message| ... } ⇒ Object

Fetch messages from kafka in a loop.

Yields:

  • messages read from Kafka

Yield Parameters:

  • message (Message)

    a Kafka message



73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/glass_octopus/connection/ruby_kafka_adapter.rb', line 73

def fetch_message
  @consumer.each_message do |fetched_message|
    message = Message.new(
      fetched_message.topic,
      fetched_message.partition,
      fetched_message.offset,
      fetched_message.key,
      fetched_message.value
    )

    yield message
  end
end