Class: CC::Kafka::Consumer
- Inherits:
-
Object
- Object
- CC::Kafka::Consumer
- Defined in:
- lib/cc/kafka/consumer.rb
Constant Summary collapse
- MESSAGE_OFFSET_KEY =
"kafka_message_offset".freeze
Instance Method Summary collapse
- #close ⇒ Object
- #fetch ⇒ Object
-
#initialize(client_id, seed_brokers, topic, partition) ⇒ Consumer
constructor
A new instance of Consumer.
- #on_message(&block) ⇒ Object
- #on_start(&block) ⇒ Object
- #on_stop(&block) ⇒ Object
- #pause ⇒ Object
- #paused? ⇒ Boolean
- #set_offset(current_offset) ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
- #unpause ⇒ Object
Constructor Details
#initialize(client_id, seed_brokers, topic, partition) ⇒ Consumer
Returns a new instance of Consumer.
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# File 'lib/cc/kafka/consumer.rb', line 8 def initialize(client_id, seed_brokers, topic, partition) @offset = Kafka.offset_model.find_or_create!( topic: topic, partition: partition, ) Kafka.logger.debug("offset: #{@offset.topic}/#{@offset.partition} #{current_offset(@offset)}") @consumer = Poseidon::PartitionConsumer.consumer_for_partition( client_id, seed_brokers, @offset.topic, @offset.partition, current_offset(@offset) ) @paused = false end |
Instance Method Details
#close ⇒ Object
82 83 84 |
# File 'lib/cc/kafka/consumer.rb', line 82 def close @consumer.close end |
#fetch ⇒ Object
71 72 73 74 75 76 |
# File 'lib/cc/kafka/consumer.rb', line 71 def fetch @consumer.fetch rescue Poseidon::Errors::UnknownTopicOrPartition Kafka.logger.debug("topic #{@offset.topic.inspect} not created yet") [] end |
#on_message(&block) ⇒ Object
34 35 36 |
# File 'lib/cc/kafka/consumer.rb', line 34 def (&block) @on_message = block end |
#on_start(&block) ⇒ Object
26 27 28 |
# File 'lib/cc/kafka/consumer.rb', line 26 def on_start(&block) @on_start = block end |
#on_stop(&block) ⇒ Object
30 31 32 |
# File 'lib/cc/kafka/consumer.rb', line 30 def on_stop(&block) @on_stop = block end |
#pause ⇒ Object
59 60 61 |
# File 'lib/cc/kafka/consumer.rb', line 59 def pause @paused = true end |
#paused? ⇒ Boolean
67 68 69 |
# File 'lib/cc/kafka/consumer.rb', line 67 def paused? @paused end |
#set_offset(current_offset) ⇒ Object
78 79 80 |
# File 'lib/cc/kafka/consumer.rb', line 78 def set_offset(current_offset) @offset.set(current: current_offset) end |
#start ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/cc/kafka/consumer.rb', line 38 def start trap(:TERM) { stop } @running = true @on_start.call(@offset) if @on_start while @running do unless @paused end Kafka.logger.info("shutting down due to TERM signal") ensure @on_stop.call(@offset) if @on_stop close end |
#stop ⇒ Object
55 56 57 |
# File 'lib/cc/kafka/consumer.rb', line 55 def stop @running = false end |
#unpause ⇒ Object
63 64 65 |
# File 'lib/cc/kafka/consumer.rb', line 63 def unpause @paused = false end |