Class: Fluent::Plugin::PulsarInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/lib/in_pulsar.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



47
48
49
50
# File 'lib/fluent/plugin/lib/in_pulsar.rb', line 47

def configure(conf)
  super

end

#startObject



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/fluent/plugin/lib/in_pulsar.rb', line 51

def start
  super
  client = Message::PulsarClient.new()
  client.connect(@pulsar_host, @pulsar_port)
  client.subscribe(@pulsar_topic, @pulsar_subscription, @pulsar_subtype)
  while true do
    m = client.get_message()
    if m != nil
      time = Fluent::Engine.now
      record = {"message_entry_id"=>m.message_entry_id, "message_ledger_id"=>m.message_ledger_id,  "client_created_id"=>m.client_created_id, "message"=>m.message}
      client.ack(m.client_created_id, m.message_ledger_id, m.message_entry_id)
      router.emit(@tag, time, record)
    else
      sleep @pull_duration.to_f
    end
  end
end