Class: Fluent::Plugin::DeisOutput

Inherits:
Output
  • Object
show all
Includes:
Mixin::Deis
Defined in:
lib/fluent/plugin/out_deis.rb

Instance Method Summary collapse

Methods included from Mixin::Deis

#build_series, #from_container?, #from_router?, #kubernetes?, #parse_router_log

Constructor Details

#initializeDeisOutput

Returns a new instance of DeisOutput.



43
44
45
46
47
48
49
50
51
# File 'lib/fluent/plugin/out_deis.rb', line 43

def initialize
  super
  require 'kafka'
  require 'fluent/plugin/kafka_producer_ext'

  @kafka = nil
  @producers = {}
  @producers_mutex = Mutex.new
end

Instance Method Details

#configure(conf) ⇒ Object



104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/fluent/plugin/out_deis.rb', line 104

def configure(conf)
  super

  @producer_opts = { max_retries: @max_send_retries, required_acks: @required_acks }
  @producer_opts[:ack_timeout] = @ack_timeout if @ack_timeout
  @producer_opts[:compression_codec] = @compression_codec.to_sym if @compression_codec

  return unless @discard_kafka_delivery_failed
  log.warn "'discard_kafka_delivery_failed' option discards events which "\
           'cause delivery failure, e.g. invalid topic or something.'
  log.warn 'If this is unexpected, you need to check your configuration or data.'
end

#deliver_messages(producer, tag) ⇒ Object



82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/fluent/plugin/out_deis.rb', line 82

def deliver_messages(producer, tag)
  if @discard_kafka_delivery_failed
    begin
      producer.deliver_messages
    rescue Kafka::DeliveryFailed => e
      log.warn 'DeliveryFailed occurred. Discard broken event:',
               error: e.to_s, error_class: e.class.to_s, tag: tag
      producer.clear_buffer
    end
  else
    producer.deliver_messages
  end
end

#filter_record(record) ⇒ Object

def emit(tag, es, chain)

super(tag, es, chain, tag)

end



121
122
123
124
125
126
127
128
129
130
# File 'lib/fluent/plugin/out_deis.rb', line 121

def filter_record(record)
  return unless from_router?(record)
  data = build_series(record)
  return unless data
  return data.map do |point|
    InfluxDB::PointValue.new(point).dump
  end.join("\n")
rescue Exception => e # rubocop:disable RescueException
  puts "Error:#{e.backtrace}"
end

#get_producerObject

rubocop:disable AccessorMethodName



71
72
73
74
75
76
77
78
79
80
# File 'lib/fluent/plugin/out_deis.rb', line 71

def get_producer # rubocop:disable AccessorMethodName
  @producers_mutex.synchronize do
    producer = @producers[Thread.current.object_id]
    unless producer
      producer = @kafka.producer(@producer_opts)
      @producers[Thread.current.object_id] = producer
    end
    producer
  end
end

#refresh_client(raise_error = true) ⇒ Object



96
97
98
99
100
101
102
# File 'lib/fluent/plugin/out_deis.rb', line 96

def refresh_client(raise_error = true)
  @kafka = Kafka.new(seed_brokers: @brokers.split(','), client_id: @client_id)
  log.info "initialized kafka producer: #{@client_id}"
rescue Exception => e # rubocop:disable RescueException
  raise e if raise_error
  log.error e
end

#shutdownObject



58
59
60
61
62
# File 'lib/fluent/plugin/out_deis.rb', line 58

def shutdown
  super
  shutdown_producers
  @kafka = nil
end

#shutdown_producersObject



64
65
66
67
68
69
# File 'lib/fluent/plugin/out_deis.rb', line 64

def shutdown_producers
  @producers_mutex.synchronize do
    @producers.each_value(&:shutdown)
    @producers = {}
  end
end

#startObject



53
54
55
56
# File 'lib/fluent/plugin/out_deis.rb', line 53

def start
  super
  refresh_client
end

#write(chunk) ⇒ Object



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# File 'lib/fluent/plugin/out_deis.rb', line 132

def write(chunk)
  tag = chunk..tag
  producer = get_producer

  records_by_topic = {}
  bytes_by_topic = {}
  messages = 0
  messages_bytes = 0
  record_buf = nil
  record_buf_bytes = nil
  begin
    Fluent::Engine.msgpack_factory.unpacker(chunk.open).each do |time, record|
      begin
        topic = @metrics_topic
        records_by_topic[topic] ||= 0
        bytes_by_topic[topic] ||= 0
        line = filter_record(record)

        next unless line
        record_buf_bytes = line.bytesize
        if @max_send_limit_bytes && record_buf_bytes > @max_send_limit_bytes
          log.warn 'record size exceeds max_send_limit_bytes. Skip event:',
                   time: time, record: record
          next
        end
      rescue StandardError => e
        log.warn 'unexpected error during format record. Skip broken event:',
                 error: e.to_s, error_class: e.class.to_s, time: time, record: record
        next
      end

      if messages.positive? &&
         (messages_bytes + record_buf_bytes > @kafka_agg_max_bytes) ||
         (@kafka_agg_max_messages && messages >= @kafka_agg_max_messages)
        log.debug do
          "#{messages} messages send because reaches the limit of batch transmission."
        end
        deliver_messages(producer, tag)
        messages = 0
        messages_bytes = 0
      end

      log.trace do
        "message will send to #{topic} with partition_key: #{partition_key},"\
        "partition: #{partition}, message_key: #{message_key} and value: #{record_buf}."
      end

      messages += 1
      producer.produce2(
        line,
        topic: topic
      )
      messages_bytes += record_buf_bytes

      records_by_topic[topic] += 1
      bytes_by_topic[topic] += record_buf_bytes
    end
    if messages.positive?
      log.debug { "#{messages} messages send." }
      deliver_messages(producer, tag)
    end
    log.debug { "(records|bytes) (#{records_by_topic}|#{bytes_by_topic})" }
  end
rescue Exception => e # rubocop:disable RescueException
  log.warn "Send exception occurred: #{e}"
  log.warn "Exception Backtrace : #{e.backtrace.join("\n")}"
  # For safety, refresh client and its producers
  shutdown_producers
  refresh_client(false)
  # Raise exception to retry sendind messages
  raise e
end