Class: HttpHealthCheck::Probes::RubyKafka

Inherits:
Object
  • Object
show all
Includes:
HttpHealthCheck::Probe
Defined in:
lib/http_health_check/probes/ruby_kafka.rb

Defined Under Namespace

Classes: Heartbeat

Instance Method Summary collapse

Methods included from HttpHealthCheck::Probe

#call, #meta, #probe_error, #probe_ok, #with_error_handler

Constructor Details

#initialize(opts = {}) ⇒ RubyKafka

Returns a new instance of RubyKafka.



9
10
11
12
13
14
15
16
17
18
19
# File 'lib/http_health_check/probes/ruby_kafka.rb', line 9

def initialize(opts = {})
  @heartbeat_event_name = opts.fetch(:heartbeat_event_name, /heartbeat.consumer.kafka/)
  @heartbeat_interval_sec = opts.fetch(:heartbeat_interval_sec, 10)
  @verbose = opts.fetch(:verbose, false)
  @consumer_groups = opts.fetch(:consumer_groups)
                         .each_with_object(Hash.new(0)) { |group, hash| hash[group] += 1 }
  @heartbeats = {}
  @timer = opts.fetch(:timer, Time)

  setup_subscriptions
end

Instance Method Details

#probe(_env) ⇒ Object



21
22
23
24
25
26
27
# File 'lib/http_health_check/probes/ruby_kafka.rb', line 21

def probe(_env)
  now = @timer.now
  failed_heartbeats = select_failed_heartbeats(now)
  return probe_ok groups: meta_from_heartbeats(@heartbeats, now) if failed_heartbeats.empty?

  probe_error failed_groups: meta_from_heartbeats(failed_heartbeats, now)
end