Method: FluQ::Input::Kafka#process

Defined in:
lib/fluq/input/kafka.rb

#process(partition, messages) ⇒ Object

Processes messages

Parameters:

  • partition (Integer)
  • messages (Array<Poseidon::Message>)


55
56
57
58
59
60
61
62
63
64
# File 'lib/fluq/input/kafka.rb', line 55

def process(partition, messages)
  events = []
  messages.each do |m|
    events.concat format.parse(m.value)
  end
  events.each do |event|
    event.meta.update topic: config[:topic], partition: partition
  end
  worker.process events
end