Class: RubyKafkaOutput

Inherits:
Fluent::Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_ruby_kafka.rb

Instance Method Summary collapse

Constructor Details

#initializeRubyKafkaOutput

Returns a new instance of RubyKafkaOutput.



28
29
30
31
32
33
# File 'lib/fluent/plugin/out_ruby_kafka.rb', line 28

def initialize
  super
  require "kafka"
  require "zookeeper"
  require "json"
end

Instance Method Details

#configure(conf) ⇒ Object

Raises:

  • (Fluent::ConfigError)


35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/fluent/plugin/out_ruby_kafka.rb', line 35

def configure(conf)
  super
  @seed_brokers = []
  if @zookeepers
    z = Zookeeper.new(@zookeepers)
    z.get_children(:path => '/brokers/ids')[:children].each do |id|
      broker = Yajl.load(z.get(:path => "/brokers/ids/#{id}")[:data])
      @seed_brokers.push("#{broker['host']}:#{broker['port']}")
    end
    z.close
    log.info "brokers has been refreshed via Zookeeper: #{@seed_brokers}"
  end

  if @seed_brokers.empty? and @brokers
    @seed_brokers = @brokers.match(",").nil? ? [@brokers] : @brokers.split(",")
    log.info "brokers has been set directly: #{@seed_brokers}"
  end

  raise Fluent::ConfigError, "Broker has not been set." if @seed_brokers.empty?

  kafka = Kafka.new(seed_brokers: @seed_brokers, logger: @use_kafka_log ? log : nil)
  @producer = kafka.async_producer(
    max_queue_size: @max_queue_size,
    delivery_threshold: @delivery_threshold,
    delivery_interval: @delivery_interval,
  )
end

#emit(tag, es, chain) ⇒ Object



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/fluent/plugin/out_ruby_kafka.rb', line 72

def emit(tag, es, chain)
  chain.next
  es.each do |time,record|
    record['time'] = time if @output_include_time
    record['tag'] = tag if @output_include_tag
    topic = record['topic'] || @default_topic || tag

    data = JSON.dump(record)
    retry_counter = 0
    begin
      retry_counter += 1
      @producer.produce(data, topic: topic)
    rescue Kafka::BufferOverflow
      if retry_counter <= @retry_count
        log.warn "Buffer overflow, backing off for 1s. #{retry_counter} time."
        sleep 1
        retry
      else
        raise
      end
    end
  end
end

#shutdownObject



67
68
69
70
# File 'lib/fluent/plugin/out_ruby_kafka.rb', line 67

def shutdown
  super
  @producer.shutdown if @producer
end

#startObject



63
64
65
# File 'lib/fluent/plugin/out_ruby_kafka.rb', line 63

def start
  super
end