Class: Kafka::Consumer
- Inherits:
-
Object
- Object
- Kafka::Consumer
- Defined in:
- lib/jruby-kafka/consumer.rb
Instance Method Summary collapse
-
#commitOffsets ⇒ Object
Commit the offsets of all topic partitions connected by this consumer.
-
#initialize(config = {}) ⇒ Consumer
constructor
Create a Kafka high-level consumer.
-
#message_streams ⇒ Array<Java::KafkaConsumer::KafkaStream>
Start fetching messages.
-
#shutdown ⇒ Object
Shutdown the consumer.
Constructor Details
#initialize(config = {}) ⇒ Consumer
Create a Kafka high-level consumer.
One and only one of :topic, :include_topics, or :exclude_topics must be provided.
For other configuration properties and their default values see kafka.apache.org/08/configuration.html#consumerconfigs and github.com/apache/kafka/blob/0.8.2.2/core/src/main/scala/kafka/consumer/ConsumerConfig.scala#L90-L182.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/jruby-kafka/consumer.rb', line 38 def initialize(config={}) validate_arguments config @properties = config.clone @topic = @properties.delete :topic @include_topics = @properties.delete :include_topics @exclude_topics = @properties.delete :exclude_topics @num_streams = (@properties.delete(:num_streams) || 1).to_java Java::int @key_decoder = @properties.delete(:key_decoder) || 'kafka.serializer.DefaultDecoder' @msg_decoder = @properties.delete(:msg_decoder) || 'kafka.serializer.DefaultDecoder' @reset_beginning = @properties.delete :reset_beginning @consumer = Consumer.createJavaConsumerConnector ConsumerConfig.new Kafka::Utility.java_properties @properties end |
Instance Method Details
#commitOffsets ⇒ Object
Commit the offsets of all topic partitions connected by this consumer.
Useful for when the :auto_commit_enable configuration parameter is false.
98 99 100 |
# File 'lib/jruby-kafka/consumer.rb', line 98 def commitOffsets @consumer.commitOffsets end |
#message_streams ⇒ Array<Java::KafkaConsumer::KafkaStream>
Note:
KafkaStreams instances are not thread-safe.
Start fetching messages.
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/jruby-kafka/consumer.rb', line 62 def begin if @reset_beginning == 'from-beginning' ZkUtils.maybeDeletePath(@properties[:zookeeper_connect], "/consumers/#{@properties[:group_id]}") end rescue ZkException => e raise KafkaError.new(e), "Got ZkException: #{e}" end key_decoder_i = Java::JavaClass.for_name(@key_decoder). constructor('kafka.utils.VerifiableProperties').new_instance nil msg_decoder_i = Java::JavaClass.for_name(@msg_decoder). constructor('kafka.utils.VerifiableProperties').new_instance nil if @topic topic_count_map = java.util.HashMap.new @topic => @num_streams @consumer. createMessageStreams(topic_count_map, key_decoder_i, msg_decoder_i)[@topic]. to_a else filter = @include_topics ? Whitelist.new(@include_topics) : Blacklist.new(@exclude_topics) @consumer. createMessageStreamsByFilter(filter, @num_streams, key_decoder_i, msg_decoder_i). to_a end end |
#shutdown ⇒ Object
Shutdown the consumer.
105 106 107 108 |
# File 'lib/jruby-kafka/consumer.rb', line 105 def shutdown @consumer.shutdown if @consumer nil end |