51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
# File 'lib/fluent/plugin/lib/in_pulsar.rb', line 51
def start
super
client = Message::PulsarClient.new()
client.connect(@pulsar_host, @pulsar_port)
client.subscribe(@pulsar_topic, @pulsar_subscription, @pulsar_subtype)
while true do
m = client.get_message()
if m != nil
time = Fluent::Engine.now
record = {"message_entry_id"=>m.message_entry_id, "message_ledger_id"=>m.message_ledger_id, "client_created_id"=>m.client_created_id, "message"=>m.message}
client.ack(m.client_created_id, m.message_ledger_id, m.message_entry_id)
router.emit(@tag, time, record)
else
sleep @pull_duration.to_f
end
end
end
|