Class: Karafka::Instrumentation::Vendors::Kubernetes::LivenessListener

Inherits:
Object
  • Object
show all
Includes:
Core::Helpers::Time
Defined in:
lib/karafka/instrumentation/vendors/kubernetes/liveness_listener.rb

Overview

Kubernetes HTTP listener that does not only reply when process is not fully hanging, but also allows to define max time of processing and looping.

Processes like Karafka server can hang while still being reachable. For example, in case something would hang inside of the user code, Karafka could stop polling and no new data would be processed, but process itself would still be active. This listener allows for defining of a ttl that gets bumped on each poll loop and before and after processing of a given messages batch.

Instance Method Summary collapse

Constructor Details

#initialize(hostname: nil, port: 3000, consuming_ttl: 5 * 60 * 1_000, polling_ttl: 5 * 60 * 1_000) ⇒ LivenessListener

Note:

The default TTL matches the default ‘max.poll.interval.ms`

Returns a new instance of LivenessListener.

Parameters:

  • hostname (String, nil) (defaults to: nil)

    hostname or nil to bind on all

  • port (Integer) (defaults to: 3000)

    TCP port on which we want to run our HTTP status server

  • consuming_ttl (Integer) (defaults to: 5 * 60 * 1_000)

    time in ms after which we consider consumption hanging. It allows us to define max consumption time after which k8s should consider given process as hanging

  • polling_ttl (Integer) (defaults to: 5 * 60 * 1_000)

    max time in ms for polling. If polling (any) does not happen that often, process should be considered dead.



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/karafka/instrumentation/vendors/kubernetes/liveness_listener.rb', line 37

def initialize(
  hostname: nil,
  port: 3000,
  consuming_ttl: 5 * 60 * 1_000,
  polling_ttl: 5 * 60 * 1_000
)
  @server = TCPServer.new(*[hostname, port].compact)
  @polling_ttl = polling_ttl
  @consuming_ttl = consuming_ttl
  @mutex = Mutex.new
  @pollings = {}
  @consumptions = {}

  Thread.new do
    loop do
      break unless respond
    end
  end
end

Instance Method Details

#on_app_stopped(_event) ⇒ Object

Stop the http server when we stop the process

Parameters:

  • _event (Karafka::Core::Monitoring::Event)


103
104
105
# File 'lib/karafka/instrumentation/vendors/kubernetes/liveness_listener.rb', line 103

def on_app_stopped(_event)
  @server.close
end

#on_connection_listener_fetch_loop(_event) ⇒ Object

Tick on each fetch

Parameters:

  • _event (Karafka::Core::Monitoring::Event)


59
60
61
# File 'lib/karafka/instrumentation/vendors/kubernetes/liveness_listener.rb', line 59

def on_connection_listener_fetch_loop(_event)
  mark_polling_tick
end

#on_consumer_consume(_event) ⇒ Object

Tick on starting work

Parameters:

  • _event (Karafka::Core::Monitoring::Event)


65
66
67
# File 'lib/karafka/instrumentation/vendors/kubernetes/liveness_listener.rb', line 65

def on_consumer_consume(_event)
  mark_consumption_tick
end

#on_consumer_consumed(_event) ⇒ Object

Tick on finished work

Parameters:

  • _event (Karafka::Core::Monitoring::Event)


71
72
73
# File 'lib/karafka/instrumentation/vendors/kubernetes/liveness_listener.rb', line 71

def on_consumer_consumed(_event)
  clear_consumption_tick
end

#on_consumer_revoke(_event) ⇒ Object

Parameters:

  • _event (Karafka::Core::Monitoring::Event)


76
77
78
# File 'lib/karafka/instrumentation/vendors/kubernetes/liveness_listener.rb', line 76

def on_consumer_revoke(_event)
  mark_consumption_tick
end

#on_consumer_revoked(_event) ⇒ Object

Parameters:

  • _event (Karafka::Core::Monitoring::Event)


81
82
83
# File 'lib/karafka/instrumentation/vendors/kubernetes/liveness_listener.rb', line 81

def on_consumer_revoked(_event)
  clear_consumption_tick
end

#on_consumer_shutdown(_event) ⇒ Object

Parameters:

  • _event (Karafka::Core::Monitoring::Event)


91
92
93
# File 'lib/karafka/instrumentation/vendors/kubernetes/liveness_listener.rb', line 91

def on_consumer_shutdown(_event)
  clear_consumption_tick
end

#on_consumer_shutting_down(_event) ⇒ Object

Parameters:

  • _event (Karafka::Core::Monitoring::Event)


86
87
88
# File 'lib/karafka/instrumentation/vendors/kubernetes/liveness_listener.rb', line 86

def on_consumer_shutting_down(_event)
  mark_consumption_tick
end

#on_error_occurred(_event) ⇒ Object

Parameters:

  • _event (Karafka::Core::Monitoring::Event)


96
97
98
99
# File 'lib/karafka/instrumentation/vendors/kubernetes/liveness_listener.rb', line 96

def on_error_occurred(_event)
  clear_consumption_tick
  clear_polling_tick
end