Class: Kafka::KafkaConsumer

Inherits:
Object
  • Object
show all
Defined in:
lib/jruby-kafka/kafka-consumer.rb

Constant Summary collapse

REQUIRED =
[
  :bootstrap_servers, :key_deserializer, :value_deserializer
]

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config = {}) ⇒ KafkaConsumer

Create a Kafka high-level consumer.

For other configuration properties and their default values see kafka.apache.org/documentation.html#newconsumerconfigs and kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html.

Parameters:

  • config (Hash) (defaults to: {})

    the consumer configuration.

Options Hash (config):

  • :bootstrap_servers (String)

    A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form host1:port1,host2:port2,.… Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down). Required.

  • :key_deserializer (String)

    Deserializer class for key that implements the Deserializer interface. Required.

  • :value_deserializer (String)

    Deserializer class for value that implements the Deserializer interface. Required.

  • :topics (Array)

    The topic to consume from. Required.



26
27
28
29
30
31
# File 'lib/jruby-kafka/kafka-consumer.rb', line 26

def initialize(config={})
  Kafka::Utility.validate_arguments REQUIRED, config
  @properties      =  config.clone
  @stop_called     =  Concurrent::AtomicBoolean.new(false)
  super Kafka::Utility.java_properties @properties
end

Instance Attribute Details

#propertiesObject (readonly)

Returns the value of attribute properties.



33
34
35
# File 'lib/jruby-kafka/kafka-consumer.rb', line 33

def properties
  @properties
end

Instance Method Details

#stopObject



35
36
37
38
# File 'lib/jruby-kafka/kafka-consumer.rb', line 35

def stop
  @stop_called.make_true
  self.wakeup
end

#stop?Boolean

stop? should never be overriden

Returns:

  • (Boolean)


41
42
43
# File 'lib/jruby-kafka/kafka-consumer.rb', line 41

def stop?
  @stop_called.value
end