Class: Karafka::Instrumentation::Vendors::Kubernetes::LivenessListener
- Inherits:
-
Object
- Object
- Karafka::Instrumentation::Vendors::Kubernetes::LivenessListener
- Includes:
- Core::Helpers::Time
- Defined in:
- lib/karafka/instrumentation/vendors/kubernetes/liveness_listener.rb
Overview
Kubernetes HTTP listener that does not only reply when process is not fully hanging, but also allows to define max time of processing and looping.
Processes like Karafka server can hang while still being reachable. For example, in case something would hang inside of the user code, Karafka could stop polling and no new data would be processed, but process itself would still be active. This listener allows for defining of a ttl that gets bumped on each poll loop and before and after processing of a given messages batch.
Instance Method Summary collapse
-
#initialize(hostname: nil, port: 3000, consuming_ttl: 5 * 60 * 1_000, polling_ttl: 5 * 60 * 1_000) ⇒ LivenessListener
constructor
A new instance of LivenessListener.
-
#on_app_stopped(_event) ⇒ Object
Stop the http server when we stop the process.
-
#on_connection_listener_fetch_loop(_event) ⇒ Object
Tick on each fetch.
-
#on_consumer_consume(_event) ⇒ Object
Tick on starting work.
-
#on_consumer_consumed(_event) ⇒ Object
Tick on finished work.
- #on_consumer_revoke(_event) ⇒ Object
- #on_consumer_revoked(_event) ⇒ Object
- #on_consumer_shutdown(_event) ⇒ Object
- #on_consumer_shutting_down(_event) ⇒ Object
- #on_error_occurred(_event) ⇒ Object
Constructor Details
#initialize(hostname: nil, port: 3000, consuming_ttl: 5 * 60 * 1_000, polling_ttl: 5 * 60 * 1_000) ⇒ LivenessListener
The default TTL matches the default ‘max.poll.interval.ms`
Returns a new instance of LivenessListener.
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/karafka/instrumentation/vendors/kubernetes/liveness_listener.rb', line 37 def initialize( hostname: nil, port: 3000, consuming_ttl: 5 * 60 * 1_000, polling_ttl: 5 * 60 * 1_000 ) @server = TCPServer.new(*[hostname, port].compact) @polling_ttl = polling_ttl @consuming_ttl = consuming_ttl @mutex = Mutex.new @pollings = {} @consumptions = {} Thread.new do loop do break unless respond end end end |
Instance Method Details
#on_app_stopped(_event) ⇒ Object
Stop the http server when we stop the process
103 104 105 |
# File 'lib/karafka/instrumentation/vendors/kubernetes/liveness_listener.rb', line 103 def on_app_stopped(_event) @server.close end |
#on_connection_listener_fetch_loop(_event) ⇒ Object
Tick on each fetch
59 60 61 |
# File 'lib/karafka/instrumentation/vendors/kubernetes/liveness_listener.rb', line 59 def on_connection_listener_fetch_loop(_event) mark_polling_tick end |
#on_consumer_consume(_event) ⇒ Object
Tick on starting work
65 66 67 |
# File 'lib/karafka/instrumentation/vendors/kubernetes/liveness_listener.rb', line 65 def on_consumer_consume(_event) mark_consumption_tick end |
#on_consumer_consumed(_event) ⇒ Object
Tick on finished work
71 72 73 |
# File 'lib/karafka/instrumentation/vendors/kubernetes/liveness_listener.rb', line 71 def on_consumer_consumed(_event) clear_consumption_tick end |
#on_consumer_revoke(_event) ⇒ Object
76 77 78 |
# File 'lib/karafka/instrumentation/vendors/kubernetes/liveness_listener.rb', line 76 def on_consumer_revoke(_event) mark_consumption_tick end |
#on_consumer_revoked(_event) ⇒ Object
81 82 83 |
# File 'lib/karafka/instrumentation/vendors/kubernetes/liveness_listener.rb', line 81 def on_consumer_revoked(_event) clear_consumption_tick end |
#on_consumer_shutdown(_event) ⇒ Object
91 92 93 |
# File 'lib/karafka/instrumentation/vendors/kubernetes/liveness_listener.rb', line 91 def on_consumer_shutdown(_event) clear_consumption_tick end |
#on_consumer_shutting_down(_event) ⇒ Object
86 87 88 |
# File 'lib/karafka/instrumentation/vendors/kubernetes/liveness_listener.rb', line 86 def on_consumer_shutting_down(_event) mark_consumption_tick end |
#on_error_occurred(_event) ⇒ Object
96 97 98 99 |
# File 'lib/karafka/instrumentation/vendors/kubernetes/liveness_listener.rb', line 96 def on_error_occurred(_event) clear_consumption_tick clear_polling_tick end |