Class: Fluent::Plugin::PulsarOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::PulsarOutput
- Defined in:
- lib/fluent/plugin/lib/out_pulsar.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #proces(tag, es) ⇒ Object
- #refresh_brokers(raise_error = true) ⇒ Object
- #start ⇒ Object
Instance Method Details
#configure(conf) ⇒ Object
26 27 28 |
# File 'lib/fluent/plugin/lib/out_pulsar.rb', line 26 def configure(conf) super end |
#proces(tag, es) ⇒ Object
51 52 53 54 55 56 57 |
# File 'lib/fluent/plugin/lib/out_pulsar.rb', line 51 def proces(tag,es) es.each do |time, record| log.debug "Publishing records to #{@pulsar_topic}" @producer.send(@pulsar_topic,@pulsar_producer,,record) log.debug "Successfully sent a record to #{@pulsar_topic}" end end |
#refresh_brokers(raise_error = true) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/fluent/plugin/lib/out_pulsar.rb', line 30 def refresh_brokers(raise_error = true) begin if @pulsar_host != nil && @pulsar_port != nil @producer = Message::PulsarClient.new() @producer.connect(@pulsar_host, @pulsar_port) log.info "Connected to pulsar brokers successfully" end rescue Exception => e if raise_error raise e else log.error e end end end |
#start ⇒ Object
46 47 48 49 |
# File 'lib/fluent/plugin/lib/out_pulsar.rb', line 46 def start super refresh_brokers end |