Class: Fluent::Plugin::KafkaStatusInput
- Inherits:
-
Input
- Object
- Input
- Fluent::Plugin::KafkaStatusInput
- Includes:
- KafkaStatusUtils
- Defined in:
- lib/fluent/plugin/in_kafka_status.rb
Constant Summary collapse
- NAME =
'kafka_status'- DEFAULT_TAG =
NAME- DEFAULT_BROKERS =
['localhost:9092']
- DEFAULT_CLIENT_ID =
"fluentd:#{NAME}"- DEFAULT_INTERVAL =
3600- DEFAULT_TIMESTAMP_FORMAT =
:iso- DEFAULT_EVENT_PREFIX =
''- MAX_RETRIES =
3
Instance Method Summary collapse
- #close_kafka ⇒ Object
- #configure(conf) ⇒ Object
-
#events_for_cluster_brokers ⇒ Object
metric 4 - brokers_count family: cluster name: brokers_count value: count ??? metric 5 - broker_active family: cluster name: broker_active value: 1 tags_member_host: <host> ???.
-
#events_for_topics ⇒ Object
metric 1 - active topic family: topic name: active value: 1 tags_topic: <topic> metric 2 - partitions_count family: topic name: configured_partitions_count value: X tags_topic: <topic> metric 3 - replica_count family: topic name: replica_count value: count tags_topic: <topic>.
-
#initialize ⇒ KafkaStatusInput
constructor
A new instance of KafkaStatusInput.
- #kafka(max_retries = MAX_RETRIES) ⇒ Object
- #kafka_status ⇒ Object
- #new_kafka ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
Methods included from KafkaStatusUtils
Constructor Details
#initialize ⇒ KafkaStatusInput
Returns a new instance of KafkaStatusInput.
69 70 71 72 73 |
# File 'lib/fluent/plugin/in_kafka_status.rb', line 69 def initialize super @kafka = nil end |
Instance Method Details
#close_kafka ⇒ Object
213 214 215 216 217 |
# File 'lib/fluent/plugin/in_kafka_status.rb', line 213 def close_kafka @kafka&.close @kafka = nil rescue StandardError end |
#configure(conf) ⇒ Object
75 76 77 78 79 80 81 82 83 84 |
# File 'lib/fluent/plugin/in_kafka_status.rb', line 75 def configure(conf) super raise Fluent::ConfigError, 'tag should not be empty' if tag.nil? || tag.empty? raise Fluent::ConfigError, 'No brokers specified. Need one broker at least.' if brokers.empty? raise Fluent::ConfigError, 'ssl_ca_cert should be a file.' if ssl_ca_cert && !File.file?(ssl_ca_cert) raise Fluent::ConfigError, 'ssl_client_cert should be a file.' if ssl_client_cert && !File.file?(ssl_client_cert) raise Fluent::ConfigError, 'ssl_client_cert_key should be a file.' if ssl_client_cert_key && !File.file?(ssl_client_cert_key) end |
#events_for_cluster_brokers ⇒ Object
metric 4 - brokers_count family: cluster name: brokers_count value: count ??? metric 5 - broker_active family: cluster name: broker_active value: 1 tags_member_host: <host> ???
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/fluent/plugin/in_kafka_status.rb', line 159 def events_for_cluster_brokers kafka do |kafka| events = [] = Time.now.utc.send("to_#{timestamp_format}") brokers = kafka.brokers events.append({ 'timestamp' => , 'metric_family' => 'cluster', 'metric_name' => 'brokers_count', 'metric_value' => brokers.size }) brokers.each do |broker| events.append({ 'timestamp' => , 'metric_family' => 'cluster', 'metric_name' => 'broker_active', 'metric_value' => 1, 'tags_member_host' => broker.host }) end events end end |
#events_for_topics ⇒ Object
metric 1 - active topic family: topic name: active value: 1 tags_topic: <topic> metric 2 - partitions_count family: topic name: configured_partitions_count value: X tags_topic: <topic> metric 3 - replica_count family: topic name: replica_count value: count tags_topic: <topic>
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/fluent/plugin/in_kafka_status.rb', line 122 def events_for_topics kafka do |kafka| events = [] = Time.now.utc.send("to_#{timestamp_format}") kafka.topics.each do |topic| events .append({ 'timestamp' => , 'metric_family' => 'topic', 'metric_name' => 'active', 'metric_value' => 1, "#{event_prefix}topic" => topic }) .append({ 'timestamp' => , 'metric_family' => 'topic', 'metric_name' => 'configured_partitions_count', 'metric_value' => kafka.partitions_for(topic), "#{event_prefix}topic" => topic }) .append({ 'timestamp' => , 'metric_family' => 'topic', 'metric_name' => 'replica_count', 'metric_value' => kafka.replica_count_for(topic), "#{event_prefix}topic" => topic }) end events end end |
#kafka(max_retries = MAX_RETRIES) ⇒ Object
183 184 185 186 187 188 189 190 191 192 193 194 195 196 |
# File 'lib/fluent/plugin/in_kafka_status.rb', line 183 def kafka(max_retries = MAX_RETRIES) retry_count = 0 begin @kafka ||= new_kafka yield @kafka if block_given? rescue StandardError => e log.error "exception with kafka connection: #{e}" raise e if (retry_count += 1) > max_retries close_kafka sleep 30 retry end end |
#kafka_status ⇒ Object
99 100 101 102 103 104 105 |
# File 'lib/fluent/plugin/in_kafka_status.rb', line 99 def kafka_status events = [] events += events_for_topics events += events_for_cluster_brokers time = Fluent::Engine.now events.each { |event| router.emit(tag, time, event) } end |
#new_kafka ⇒ Object
198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
# File 'lib/fluent/plugin/in_kafka_status.rb', line 198 def new_kafka kafka = Kafka.new( seed_brokers: brokers, client_id: @client_id, connect_timeout: connect_timeout, ssl_ca_cert: read_file(ssl_ca_cert), ssl_client_cert: read_file(ssl_client_cert), ssl_client_cert_key: read_file(ssl_client_cert_key), ssl_ca_certs_from_system: !!ssl_ca_cert, ssl_verify_hostname: ssl_verify_hostname ) log.info "initialized kafka producer: #{@client_id}" kafka end |
#shutdown ⇒ Object
94 95 96 97 |
# File 'lib/fluent/plugin/in_kafka_status.rb', line 94 def shutdown super close_kafka end |
#start ⇒ Object
86 87 88 89 90 91 92 |
# File 'lib/fluent/plugin/in_kafka_status.rb', line 86 def start super timer_execute(:kafka_status_first, 1, repeat: false, &method(:kafka_status)) if interval > 60 timer_execute(:kafka_status, interval, repeat: true, &method(:kafka_status)) end |