Module: Karafka::Instrumentation::Listener
- Extended by:
- WaterDrop::Instrumentation::Listener
- Defined in:
- lib/karafka/instrumentation/listener.rb
Overview
Default listener that hooks up to our instrumentation and uses its events for logging It can be removed/replaced or anything without any harm to the Karafka app flow
Constant Summary collapse
- USED_LOG_LEVELS =
Log levels that we use in this particular listener
%i[ debug info error fatal ].freeze
Class Method Summary collapse
-
.on_backends_inline_process(event) ⇒ Object
Logs info about processing of a certain dataset with an inline backend.
-
.on_connection_client_fetch_loop_error(event) ⇒ Object
Logs errors that are related to the connection itself.
-
.on_connection_delegator_call(event) ⇒ Object
Logs details about incoming messages and with which consumer we will consume them.
-
.on_connection_listener_fetch_loop_error(event) ⇒ Object
Logs errors that occured in a listener fetch loop.
-
.on_consumers_responders_respond_with(event) ⇒ Object
Logs info about responder usage withing a controller flow.
-
.on_fetcher_call_error(event) ⇒ Object
Logs info about crashed fetcher.
-
.on_params_params_parse(event) ⇒ Object
Logs details about each received message value parsing.
-
.on_params_params_parse_error(event) ⇒ Object
Logs unsuccessful parsing attempts of incoming data.
-
.on_process_notice_signal(event) ⇒ Object
Logs info about system signals that Karafka received.
-
.on_server_stop(_event) ⇒ Object
Logs info that we’re going to stop the Karafka server.
-
.on_server_stop_error(_event) ⇒ Object
Logs an error that Karafka was unable to stop the server gracefully and it had to do a forced exit.
Class Method Details
.on_backends_inline_process(event) ⇒ Object
Logs info about processing of a certain dataset with an inline backend
67 68 69 70 71 72 |
# File 'lib/karafka/instrumentation/listener.rb', line 67 def on_backends_inline_process(event) count = event[:caller].send(:params_batch).to_a.size topic = event[:caller].topic.name time = event[:time] info "Inline processing of topic #{topic} with #{count} messages took #{time} ms" end |
.on_connection_client_fetch_loop_error(event) ⇒ Object
Karafka will attempt to reconnect, so an error not a fatal
Logs errors that are related to the connection itself
53 54 55 |
# File 'lib/karafka/instrumentation/listener.rb', line 53 def on_connection_client_fetch_loop_error(event) error "Client fetch loop error: #{event[:error]}" end |
.on_connection_delegator_call(event) ⇒ Object
Logs details about incoming messages and with which consumer we will consume them
22 23 24 25 26 27 |
# File 'lib/karafka/instrumentation/listener.rb', line 22 def on_connection_delegator_call(event) consumer = event[:consumer] topic = consumer.topic.name = event[:kafka_messages] info "#{.count} messages on #{topic} topic delegated to #{consumer.class}" end |
.on_connection_listener_fetch_loop_error(event) ⇒ Object
It’s an error as we can recover from it not a fatal
Logs errors that occured in a listener fetch loop
46 47 48 |
# File 'lib/karafka/instrumentation/listener.rb', line 46 def on_connection_listener_fetch_loop_error(event) error "Listener fetch loop error: #{event[:error]}" end |
.on_consumers_responders_respond_with(event) ⇒ Object
Logs info about responder usage withing a controller flow
82 83 84 85 86 87 |
# File 'lib/karafka/instrumentation/listener.rb', line 82 def on_consumers_responders_respond_with(event) calling = event[:caller].class responder = calling.topic.responder data = event[:data] info "Responded from #{calling} using #{responder} with following data #{data}" end |
.on_fetcher_call_error(event) ⇒ Object
If this happens, Karafka will shutdown as it means a critical error in one of the threads
Logs info about crashed fetcher
61 62 63 |
# File 'lib/karafka/instrumentation/listener.rb', line 61 def on_fetcher_call_error(event) fatal "Fetcher crash due to an error: #{event[:error]}" end |
.on_params_params_parse(event) ⇒ Object
Logs details about each received message value parsing
31 32 33 34 35 |
# File 'lib/karafka/instrumentation/listener.rb', line 31 def on_params_params_parse(event) # Keep in mind, that a caller here is a param object not a controller, # so it returns a topic as a string, not a routing topic debug "Params parsing for #{event[:caller].topic} topic successful in #{event[:time]} ms" end |
.on_params_params_parse_error(event) ⇒ Object
Logs unsuccessful parsing attempts of incoming data
39 40 41 |
# File 'lib/karafka/instrumentation/listener.rb', line 39 def on_params_params_parse_error(event) error "Params parsing error for #{event[:caller].topic} topic: #{event[:error]}" end |
.on_process_notice_signal(event) ⇒ Object
Logs info about system signals that Karafka received
76 77 78 |
# File 'lib/karafka/instrumentation/listener.rb', line 76 def on_process_notice_signal(event) info "Received #{event[:signal]} system signal" end |
.on_server_stop(_event) ⇒ Object
Logs info that we’re going to stop the Karafka server
91 92 93 94 |
# File 'lib/karafka/instrumentation/listener.rb', line 91 def on_server_stop(_event) # We use a separate thread as logging can't be called from trap context Thread.new { info "Stopping Karafka server #{::Process.pid}" } end |
.on_server_stop_error(_event) ⇒ Object
Logs an error that Karafka was unable to stop the server gracefully and it had to do a
forced exit
99 100 101 102 |
# File 'lib/karafka/instrumentation/listener.rb', line 99 def on_server_stop_error(_event) # We use a separate thread as logging can't be called from trap context Thread.new { error "Forceful Karafka server #{::Process.pid} stop" } end |