Class: Kafka::Consumer
- Inherits:
-
Object
- Object
- Kafka::Consumer
- Defined in:
- lib/jruby-kafka/consumer.rb
Instance Method Summary collapse
-
#commitOffsets ⇒ Object
Commit the offsets of all topic partitions connected by this consumer.
-
#connect ⇒ Object
Create connection to Kafka and Zookeeper.
-
#initialize(config = {}) ⇒ Consumer
constructor
Create a Kafka high-level consumer.
-
#message_streams ⇒ Array<Java::KafkaConsumer::KafkaStream>
Start fetching messages.
-
#shutdown ⇒ Object
Shutdown the consumer.
Constructor Details
#initialize(config = {}) ⇒ Consumer
Create a Kafka high-level consumer.
One and only one of :topic, :include_topics, or :exclude_topics must be provided.
For other configuration properties and their default values see kafka.apache.org/08/configuration.html#consumerconfigs and github.com/apache/kafka/blob/0.8.2.2/core/src/main/scala/kafka/consumer/ConsumerConfig.scala#L90-L182.
38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/jruby-kafka/consumer.rb', line 38 def initialize(config={}) validate_arguments config @properties = config.clone @topic = @properties.delete :topic @include_topics = @properties.delete :include_topics @exclude_topics = @properties.delete :exclude_topics @num_streams = (@properties.delete(:num_streams) || 1).to_java Java::int @key_decoder = @properties.delete(:key_decoder) || 'kafka.serializer.DefaultDecoder' @msg_decoder = @properties.delete(:msg_decoder) || 'kafka.serializer.DefaultDecoder' @reset_beginning = @properties.delete :reset_beginning @consumer = nil end |
Instance Method Details
#commitOffsets ⇒ Object
Commit the offsets of all topic partitions connected by this consumer.
Useful for when the :auto_commit_enable configuration parameter is false.
105 106 107 |
# File 'lib/jruby-kafka/consumer.rb', line 105 def commitOffsets @consumer.commitOffsets end |
#connect ⇒ Object
Create connection to Kafka and Zookeeper.
55 56 57 |
# File 'lib/jruby-kafka/consumer.rb', line 55 def connect @consumer = Consumer.createJavaConsumerConnector ConsumerConfig.new Kafka::Utility.java_properties @properties end |
#message_streams ⇒ Array<Java::KafkaConsumer::KafkaStream>
KafkaStreams instances are not thread-safe.
Start fetching messages.
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/jruby-kafka/consumer.rb', line 68 def connect if @consumer.nil? begin if @reset_beginning == 'from-beginning' ZkUtils.maybeDeletePath(@properties[:zookeeper_connect], "/consumers/#{@properties[:group_id]}") end rescue ZkException => e raise KafkaError.new(e), "Got ZkException: #{e}" end key_decoder_i = Java::JavaClass.for_name(@key_decoder). constructor('kafka.utils.VerifiableProperties').new_instance nil msg_decoder_i = Java::JavaClass.for_name(@msg_decoder). constructor('kafka.utils.VerifiableProperties').new_instance nil if @topic topic_count_map = java.util.HashMap.new @topic => @num_streams @consumer. createMessageStreams(topic_count_map, key_decoder_i, msg_decoder_i)[@topic]. to_a else filter = @include_topics ? Whitelist.new(@include_topics) : Blacklist.new(@exclude_topics) @consumer. createMessageStreamsByFilter(filter, @num_streams, key_decoder_i, msg_decoder_i). to_a end end |
#shutdown ⇒ Object
Shutdown the consumer.
112 113 114 115 116 |
# File 'lib/jruby-kafka/consumer.rb', line 112 def shutdown @consumer.shutdown if @consumer @consumer = nil nil end |