Class: Fluent::Plugin::PulsarOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::PulsarOutput
- Defined in:
- lib/fluent/plugin/lib/out_pulsar.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #process(tag, es) ⇒ Object
- #send_to_pulsar(record) ⇒ Object
- #start ⇒ Object
Instance Method Details
#configure(conf) ⇒ Object
24 25 26 27 |
# File 'lib/fluent/plugin/lib/out_pulsar.rb', line 24 def configure(conf) super end |
#process(tag, es) ⇒ Object
34 35 36 37 38 39 |
# File 'lib/fluent/plugin/lib/out_pulsar.rb', line 34 def process(tag,es) es.each do |time,record| log.debug "Producing records to #{@pulsar_topic}" send_to_pulsar(record) end end |
#send_to_pulsar(record) ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/fluent/plugin/lib/out_pulsar.rb', line 40 def send_to_pulsar(record) retry_count = 0 if client.send(@pulsar_topic, @pulsar_producer, @num_messages, record) == true log.info "Successfully sent a record" else log.warn "Failed to send, retrying.." retry_count += 1 if (retry_count <= @num_retry) send_to_pulsar(record) else log.error "retry limit exceeded, dropping record.." end end end |
#start ⇒ Object
28 29 30 31 32 33 |
# File 'lib/fluent/plugin/lib/out_pulsar.rb', line 28 def start super client = Message::PulsarClient.new() client.connect(@pulsar_host, @pulsar_port) log.info "Connected to pulsar brokers successfully" end |