Method: FluQ::Input::Kafka#process
- Defined in:
- lib/fluq/input/kafka.rb
#process(partition, messages) ⇒ Object
Processes messages
55 56 57 58 59 60 61 62 63 64 |
# File 'lib/fluq/input/kafka.rb', line 55 def process(partition, ) events = [] .each do |m| events.concat format.parse(m.value) end events.each do |event| event..update topic: config[:topic], partition: partition end worker.process events end |