Class: Fluent::Plugin::KafkaStatusInput

Inherits:
Input
  • Object
show all
Includes:
KafkaStatusUtils
Defined in:
lib/fluent/plugin/in_kafka_status.rb

Constant Summary collapse

NAME =
'kafka_status'
DEFAULT_TAG =
NAME
DEFAULT_BROKERS =
['localhost:9092']
DEFAULT_CLIENT_ID =
"fluentd:#{NAME}"
DEFAULT_INTERVAL =
3600
DEFAULT_TIMESTAMP_FORMAT =
:iso
DEFAULT_EVENT_PREFIX =
''
MAX_RETRIES =
3

Instance Method Summary collapse

Methods included from KafkaStatusUtils

#read_file

Constructor Details

#initializeKafkaStatusInput

Returns a new instance of KafkaStatusInput.



69
70
71
72
73
# File 'lib/fluent/plugin/in_kafka_status.rb', line 69

def initialize
  super

  @kafka = nil
end

Instance Method Details

#close_kafkaObject



213
214
215
216
217
# File 'lib/fluent/plugin/in_kafka_status.rb', line 213

def close_kafka
  @kafka&.close
  @kafka = nil
rescue StandardError
end

#configure(conf) ⇒ Object

Raises:

  • (Fluent::ConfigError)


75
76
77
78
79
80
81
82
83
84
# File 'lib/fluent/plugin/in_kafka_status.rb', line 75

def configure(conf)
  super

  raise Fluent::ConfigError, 'tag should not be empty' if tag.nil? || tag.empty?
  raise Fluent::ConfigError, 'No brokers specified. Need one broker at least.' if brokers.empty?
  raise Fluent::ConfigError, 'ssl_ca_cert should be a file.' if ssl_ca_cert && !File.file?(ssl_ca_cert)

  raise Fluent::ConfigError, 'ssl_client_cert should be a file.' if ssl_client_cert && !File.file?(ssl_client_cert)
  raise Fluent::ConfigError, 'ssl_client_cert_key should be a file.' if ssl_client_cert_key && !File.file?(ssl_client_cert_key)
end

#events_for_cluster_brokersObject

metric 4 - brokers_count family: cluster name: brokers_count value: count ??? metric 5 - broker_active family: cluster name: broker_active value: 1 tags_member_host: <host> ???



159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/fluent/plugin/in_kafka_status.rb', line 159

def events_for_cluster_brokers
  kafka do |kafka|
    events = []
    timestamp = Time.now.utc.send("to_#{timestamp_format}")
    brokers = kafka.brokers
    events.append({
                    'timestamp' => timestamp,
                    'metric_family' => 'cluster',
                    'metric_name' => 'brokers_count',
                    'metric_value' => brokers.size
                  })
    brokers.each do |broker|
      events.append({
                      'timestamp' => timestamp,
                      'metric_family' => 'cluster',
                      'metric_name' => 'broker_active',
                      'metric_value' => 1,
                      'tags_member_host' => broker.host
                    })
    end
    events
  end
end

#events_for_topicsObject

metric 1 - active topic family: topic name: active value: 1 tags_topic: <topic> metric 2 - partitions_count family: topic name: configured_partitions_count value: X tags_topic: <topic> metric 3 - replica_count family: topic name: replica_count value: count tags_topic: <topic>



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/fluent/plugin/in_kafka_status.rb', line 122

def events_for_topics
  kafka do |kafka|
    events = []
    timestamp = Time.now.utc.send("to_#{timestamp_format}")
    kafka.topics.each do |topic|
      events
        .append({ 'timestamp' => timestamp,
                  'metric_family' => 'topic',
                  'metric_name' => 'active',
                  'metric_value' => 1,
                  "#{event_prefix}topic" => topic })
        .append({ 'timestamp' => timestamp,
                  'metric_family' => 'topic',
                  'metric_name' => 'configured_partitions_count',
                  'metric_value' => kafka.partitions_for(topic),
                  "#{event_prefix}topic" => topic })
        .append({ 'timestamp' => timestamp,
                  'metric_family' => 'topic',
                  'metric_name' => 'replica_count',
                  'metric_value' => kafka.replica_count_for(topic),
                  "#{event_prefix}topic" => topic })
    end
    events
  end
end

#kafka(max_retries = MAX_RETRIES) ⇒ Object



183
184
185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/fluent/plugin/in_kafka_status.rb', line 183

def kafka(max_retries = MAX_RETRIES)
  retry_count = 0
  begin
    @kafka ||= new_kafka
    yield @kafka if block_given?
  rescue StandardError => e
    log.error "exception with kafka connection: #{e}"
    raise e if (retry_count += 1) > max_retries

    close_kafka
    sleep 30
    retry
  end
end

#kafka_statusObject



99
100
101
102
103
104
105
# File 'lib/fluent/plugin/in_kafka_status.rb', line 99

def kafka_status
  events = []
  events += events_for_topics
  events += events_for_cluster_brokers
  time = Fluent::Engine.now
  events.each { |event| router.emit(tag, time, event) }
end

#new_kafkaObject



198
199
200
201
202
203
204
205
206
207
208
209
210
211
# File 'lib/fluent/plugin/in_kafka_status.rb', line 198

def new_kafka
  kafka = Kafka.new(
    seed_brokers: brokers,
    client_id: @client_id,
    connect_timeout: connect_timeout,
    ssl_ca_cert: read_file(ssl_ca_cert),
    ssl_client_cert: read_file(ssl_client_cert),
    ssl_client_cert_key: read_file(ssl_client_cert_key),
    ssl_ca_certs_from_system: !!ssl_ca_cert,
    ssl_verify_hostname: ssl_verify_hostname
  )
  log.info "initialized kafka producer: #{@client_id}"
  kafka
end

#shutdownObject



94
95
96
97
# File 'lib/fluent/plugin/in_kafka_status.rb', line 94

def shutdown
  super
  close_kafka
end

#startObject



86
87
88
89
90
91
92
# File 'lib/fluent/plugin/in_kafka_status.rb', line 86

def start
  super

  timer_execute(:kafka_status_first, 1, repeat: false, &method(:kafka_status)) if interval > 60

  timer_execute(:kafka_status, interval, repeat: true, &method(:kafka_status))
end