Class: Anschel::Input::Kafka

Inherits:
Base
  • Object
show all
Defined in:
lib/anschel/input/kafka.rb

Instance Method Summary collapse

Constructor Details

#initialize(queue, config, stats, log) ⇒ Kafka

Returns a new instance of Kafka.



9
10
11
12
13
# File 'lib/anschel/input/kafka.rb', line 9

def initialize queue, config, stats, log
  log.trace event: 'input', kind: 'kafka', config: config
  @consumer_group = ::Kafka::Group.new config
  @consumer_group.run num_cpus, queue
end

Instance Method Details

#stopObject



15
16
17
18
19
# File 'lib/anschel/input/kafka.rb', line 15

def stop
  return if @stopped
  @consumer_group.shutdown
  @stopped = true
end