Class: Kafka::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/jruby-kafka/consumer.rb

Instance Method Summary collapse

Constructor Details

#initialize(config = {}) ⇒ Consumer

Create a Kafka high-level consumer.

One and only one of :topic, :include_topics, or :exclude_topics must be provided.

For other configuration properties and their default values see kafka.apache.org/08/configuration.html#consumerconfigs and github.com/apache/kafka/blob/0.8.2.2/core/src/main/scala/kafka/consumer/ConsumerConfig.scala#L90-L182.

Parameters:

  • config (Hash) (defaults to: {})

    the consumer configuration.

Options Hash (config):

  • :zookeeper_connect (String)

    The connection string for the zookeeper connection in the form “host:port[,host:port*”. Required.

  • :group_id (String)

    The consumer group this instance is a member of. Required.

  • :topic (String)

    The topic to consume from.

  • :include_topics (String)

    The inclusive (white) regular expression filter matching topics to consume from.

  • :exclude_topics (String)

    The exclusive (black) regular expression filter matching topics not to consume from.

  • :num_streams (Integer) — default: 1

    The number of message streams to create.

  • :key_decoder (String) — default: 'kafka.serializer.DefaultDecoder'

    Java class name for message key decoder.

  • :msg_decoder (String) — default: 'kafka.serializer.DefaultDecoder'

    Java class name for message value decoder.

  • :reset_beginning (String)

    Delete existing consumer group offsets. Must be “from-beginning” to have any effect. :auto_offset_reset must be also be “smallest”.



38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/jruby-kafka/consumer.rb', line 38

def initialize(config={})
  validate_arguments config

  @properties      =  config.clone
  @topic           =  @properties.delete :topic
  @include_topics  =  @properties.delete :include_topics
  @exclude_topics  =  @properties.delete :exclude_topics
  @num_streams     = (@properties.delete(:num_streams) || 1).to_java Java::int
  @key_decoder     =  @properties.delete(:key_decoder) || 'kafka.serializer.DefaultDecoder'
  @msg_decoder     =  @properties.delete(:msg_decoder) || 'kafka.serializer.DefaultDecoder'
  @reset_beginning =  @properties.delete :reset_beginning
  @consumer        =  nil
end

Instance Method Details

#commitOffsetsObject

Commit the offsets of all topic partitions connected by this consumer.

Useful for when the :auto_commit_enable configuration parameter is false.

Returns:

  • void



105
106
107
# File 'lib/jruby-kafka/consumer.rb', line 105

def commitOffsets
  @consumer.commitOffsets
end

#connectObject

Create connection to Kafka and Zookeeper.

Returns:

  • void



55
56
57
# File 'lib/jruby-kafka/consumer.rb', line 55

def connect
  @consumer = Consumer.createJavaConsumerConnector ConsumerConfig.new Kafka::Utility.java_properties @properties
end

#message_streamsArray<Java::KafkaConsumer::KafkaStream>

Note:

KafkaStreams instances are not thread-safe.

Start fetching messages.

Returns:

  • (Array<Java::KafkaConsumer::KafkaStream>)

    list of stream, as specified by the :num_stream configuration parameter. A stream is essentially a queue of incomnig messages from Kafka topic partitions.

See Also:



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/jruby-kafka/consumer.rb', line 68

def message_streams
  connect if @consumer.nil?
  begin
    if @reset_beginning == 'from-beginning'
      ZkUtils.maybeDeletePath(@properties[:zookeeper_connect], "/consumers/#{@properties[:group_id]}")
    end
  rescue ZkException => e
    raise KafkaError.new(e), "Got ZkException: #{e}"
  end
  key_decoder_i = Java::JavaClass.for_name(@key_decoder).
    constructor('kafka.utils.VerifiableProperties').new_instance nil
  msg_decoder_i = Java::JavaClass.for_name(@msg_decoder).
    constructor('kafka.utils.VerifiableProperties').new_instance nil

  if @topic
    topic_count_map = java.util.HashMap.new @topic => @num_streams
    @consumer.
      createMessageStreams(topic_count_map, key_decoder_i, msg_decoder_i)[@topic].
      to_a

  else
    filter =  @include_topics ? 
      Whitelist.new(@include_topics) :
      Blacklist.new(@exclude_topics)

    @consumer.
      createMessageStreamsByFilter(filter, @num_streams, key_decoder_i, msg_decoder_i).
      to_a

  end
end

#shutdownObject

Shutdown the consumer.

Returns:

  • void



112
113
114
115
116
# File 'lib/jruby-kafka/consumer.rb', line 112

def shutdown
  @consumer.shutdown if @consumer
  @consumer = nil
  nil
end