Class: Fluent::Plugin::PulsarOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/lib/out_pulsar.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



24
25
26
27
# File 'lib/fluent/plugin/lib/out_pulsar.rb', line 24

def configure(conf)
  super

end

#process(tag, es) ⇒ Object



34
35
36
37
38
39
# File 'lib/fluent/plugin/lib/out_pulsar.rb', line 34

def process(tag,es)
  es.each do |time,record|
    log.debug "Producing records to #{@pulsar_topic}"
    send_to_pulsar(record)
  end
end

#send_to_pulsar(record) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/fluent/plugin/lib/out_pulsar.rb', line 40

def send_to_pulsar(record)
  retry_count = 0
  if client.send(@pulsar_topic, @pulsar_producer, @num_messages, record) == true
    log.info "Successfully sent a record"
  else
    log.warn "Failed to send, retrying.."
    retry_count += 1
    if (retry_count <= @num_retry)
      send_to_pulsar(record)
    else
      log.error "retry limit exceeded, dropping record.."
    end
  end
end

#startObject



28
29
30
31
32
33
# File 'lib/fluent/plugin/lib/out_pulsar.rb', line 28

def start
  super
  client = Message::PulsarClient.new()
  client.connect(@pulsar_host, @pulsar_port)
  log.info "Connected to pulsar brokers successfully"
end