Class: Stapfen::Client::Kafka
- Inherits:
-
Object
- Object
- Stapfen::Client::Kafka
- Defined in:
- lib/stapfen/client/kafka.rb
Instance Attribute Summary collapse
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
-
#producer ⇒ Object
readonly
Returns the value of attribute producer.
Instance Method Summary collapse
-
#can_unreceive? ⇒ Boolean
Cannot unreceive.
-
#close ⇒ Boolean
Closes the consumer threads created by kafka.
-
#connect(*args) ⇒ Object
This method is not implemenented.
-
#initialize(configuration) ⇒ Kafka
constructor
Initialize a Kafka client object.
- #runloop ⇒ Object
-
#subscribe(destination, headers = {}, &block) ⇒ Object
Subscribes to a destination (i.e. kafka topic) and consumes messages.
Constructor Details
#initialize(configuration) ⇒ Kafka
Initialize a Kafka client object
25 26 27 28 29 30 31 32 33 |
# File 'lib/stapfen/client/kafka.rb', line 25 def initialize(configuration) super() @config = configuration @topic = @config[:topic] @groupId = @config[:groupId] @zookeepers = @config[:zookeepers] raise ConfigurationError unless @groupId && @zookeepers @connection = Hermann::Consumer.new(@topic, @groupId, @zookeepers) end |
Instance Attribute Details
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
15 16 17 |
# File 'lib/stapfen/client/kafka.rb', line 15 def connection @connection end |
#producer ⇒ Object (readonly)
Returns the value of attribute producer.
15 16 17 |
# File 'lib/stapfen/client/kafka.rb', line 15 def producer @producer end |
Instance Method Details
#can_unreceive? ⇒ Boolean
Cannot unreceive
41 42 43 |
# File 'lib/stapfen/client/kafka.rb', line 41 def can_unreceive? false end |
#close ⇒ Boolean
Closes the consumer threads created by kafka.
49 50 51 52 53 54 |
# File 'lib/stapfen/client/kafka.rb', line 49 def close return false unless @connection @connection.shutdown @connection = nil return true end |
#connect(*args) ⇒ Object
This method is not implemenented
36 37 38 |
# File 'lib/stapfen/client/kafka.rb', line 36 def connect(*args) # No-op end |
#runloop ⇒ Object
68 69 70 71 72 |
# File 'lib/stapfen/client/kafka.rb', line 68 def runloop loop do sleep 1 end end |
#subscribe(destination, headers = {}, &block) ⇒ Object
Subscribes to a destination (i.e. kafka topic) and consumes messages
63 64 65 66 |
# File 'lib/stapfen/client/kafka.rb', line 63 def subscribe(destination, headers={}, &block) destination = Stapfen::Destination.from_string(destination) connection.consume(destination.as_kafka, &block) end |