Class: RubyKafkaOutput
- Inherits:
-
Fluent::Output
- Object
- Fluent::Output
- RubyKafkaOutput
- Defined in:
- lib/fluent/plugin/out_ruby_kafka.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #emit(tag, es, chain) ⇒ Object
-
#initialize ⇒ RubyKafkaOutput
constructor
A new instance of RubyKafkaOutput.
- #shutdown ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize ⇒ RubyKafkaOutput
Returns a new instance of RubyKafkaOutput.
28 29 30 31 32 33 |
# File 'lib/fluent/plugin/out_ruby_kafka.rb', line 28 def initialize super require "kafka" require "zookeeper" require "json" end |
Instance Method Details
#configure(conf) ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/fluent/plugin/out_ruby_kafka.rb', line 35 def configure(conf) super @seed_brokers = [] if @zookeepers z = Zookeeper.new(@zookeepers) z.get_children(:path => '/brokers/ids')[:children].each do |id| broker = Yajl.load(z.get(:path => "/brokers/ids/#{id}")[:data]) @seed_brokers.push("#{broker['host']}:#{broker['port']}") end z.close log.info "brokers has been refreshed via Zookeeper: #{@seed_brokers}" end if @seed_brokers.empty? and @brokers @seed_brokers = @brokers.match(",").nil? ? [@brokers] : @brokers.split(",") log.info "brokers has been set directly: #{@seed_brokers}" end raise Fluent::ConfigError, "Broker has not been set." if @seed_brokers.empty? kafka = Kafka.new(seed_brokers: @seed_brokers, logger: @use_kafka_log ? log : nil) @producer = kafka.async_producer( max_queue_size: @max_queue_size, delivery_threshold: @delivery_threshold, delivery_interval: @delivery_interval, ) end |
#emit(tag, es, chain) ⇒ Object
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/fluent/plugin/out_ruby_kafka.rb', line 72 def emit(tag, es, chain) chain.next es.each do |time,record| record['time'] = time if @output_include_time record['tag'] = tag if @output_include_tag topic = record['topic'] || @default_topic || tag data = JSON.dump(record) retry_counter = 0 begin retry_counter += 1 @producer.produce(data, topic: topic) rescue Kafka::BufferOverflow if retry_counter <= @retry_count log.warn "Buffer overflow, backing off for 1s. #{retry_counter} time." sleep 1 retry else raise end end end end |
#shutdown ⇒ Object
67 68 69 70 |
# File 'lib/fluent/plugin/out_ruby_kafka.rb', line 67 def shutdown super @producer.shutdown if @producer end |
#start ⇒ Object
63 64 65 |
# File 'lib/fluent/plugin/out_ruby_kafka.rb', line 63 def start super end |