Class: GlassOctopus::RubyKafkaAdapter
- Inherits:
-
Object
- Object
- GlassOctopus::RubyKafkaAdapter
- Defined in:
- lib/glass_octopus/connection/ruby_kafka_adapter.rb
Overview
Connection adapter that uses the ruby-kafka gem to talk to Kafka 0.9+.
Instance Attribute Summary collapse
-
#options ⇒ Hash
readonly
A hash that hold the configuration set up in the initializer block.
Instance Method Summary collapse
-
#connect ⇒ void
Connect to Kafka and join the consumer group.
-
#fetch_message {|message| ... } ⇒ Object
Fetch messages from kafka in a loop.
-
#initialize(logger = nil) { ... } ⇒ RubyKafkaAdapter
constructor
A new instance of RubyKafkaAdapter.
Constructor Details
#initialize(logger = nil) { ... } ⇒ RubyKafkaAdapter
Returns a new instance of RubyKafkaAdapter.
44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/glass_octopus/connection/ruby_kafka_adapter.rb', line 44 def initialize(logger=nil) config = OpenStruct.new yield config @options = config.to_h @options[:group_id] ||= @options[:group] @options[:logger] ||= logger @kafka = nil @consumer = nil end |
Instance Attribute Details
#options ⇒ Hash (readonly)
A hash that hold the configuration set up in the initializer block.
24 25 26 |
# File 'lib/glass_octopus/connection/ruby_kafka_adapter.rb', line 24 def @options end |
Instance Method Details
#connect ⇒ void
This method returns an undefined value.
Connect to Kafka and join the consumer group.
59 60 61 62 63 64 65 66 67 68 |
# File 'lib/glass_octopus/connection/ruby_kafka_adapter.rb', line 59 def connect @kafka = connect_to_kafka @consumer = create_consumer(@kafka) @consumer.subscribe( .fetch(:topic), **.fetch(:subscription, {}) ) self end |
#fetch_message {|message| ... } ⇒ Object
Fetch messages from kafka in a loop.
73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/glass_octopus/connection/ruby_kafka_adapter.rb', line 73 def @consumer. do || = Message.new( .topic, .partition, .offset, .key, .value ) yield end end |