Class: Karafka::Instrumentation::StdoutListener

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/instrumentation/stdout_listener.rb

Overview

Default listener that hooks up to our instrumentation and uses its events for logging It can be removed/replaced or anything without any harm to the Karafka app flow

Constant Summary collapse

USED_LOG_LEVELS =

Log levels that we use in this particular listener

%i[
  debug
  info
  error
  fatal
].freeze

Instance Method Summary collapse

Instance Method Details

#on_app_initializing(_event) ⇒ Object

Logs info that we’re initializing Karafka app

Parameters:

  • _event (Dry::Events::Event)

    event details including payload



108
109
110
# File 'lib/karafka/instrumentation/stdout_listener.rb', line 108

def on_app_initializing(_event)
  info "Initializing Karafka server #{::Process.pid}"
end

#on_app_running(_event) ⇒ Object

Logs info that we’re running Karafka app

Parameters:

  • _event (Dry::Events::Event)

    event details including payload



114
115
116
# File 'lib/karafka/instrumentation/stdout_listener.rb', line 114

def on_app_running(_event)
  info "Running Karafka server #{::Process.pid}"
end

#on_app_stopping(_event) ⇒ Object

Logs info that we’re going to stop the Karafka server

Parameters:

  • _event (Dry::Events::Event)

    event details including payload



120
121
122
123
# File 'lib/karafka/instrumentation/stdout_listener.rb', line 120

def on_app_stopping(_event)
  # We use a separate thread as logging can't be called from trap context
  Thread.new { info "Stopping Karafka server #{::Process.pid}" }
end

#on_app_stopping_error(_event) ⇒ Object

Logs an error that Karafka was unable to stop the server gracefully and it had to do a

forced exit

Parameters:

  • _event (Dry::Events::Event)

    event details including payload



128
129
130
131
# File 'lib/karafka/instrumentation/stdout_listener.rb', line 128

def on_app_stopping_error(_event)
  # We use a separate thread as logging can't be called from trap context
  Thread.new { error "Forceful Karafka server #{::Process.pid} stop" }
end

#on_backends_inline_process(event) ⇒ Object

Logs info about processing of a certain dataset with an inline backend

Parameters:

  • event (Dry::Events::Event)

    event details including payload



84
85
86
87
88
89
# File 'lib/karafka/instrumentation/stdout_listener.rb', line 84

def on_backends_inline_process(event)
  count = event[:caller].send(:params_batch).to_a.size
  topic = event[:caller].topic.name
  time = event[:time]
  info "Inline processing of topic #{topic} with #{count} messages took #{time} ms"
end

#on_connection_batch_delegator_call(event) ⇒ Object

Logs details about incoming batches and with which consumer we will consume them

Parameters:

  • event (Dry::Events::Event)

    event details including payload



18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/karafka/instrumentation/stdout_listener.rb', line 18

def on_connection_batch_delegator_call(event)
  consumer = event[:consumer]
  topic = consumer.topic.name
  kafka_messages = event[:kafka_batch].messages
  info(
    <<~MSG.chomp.tr("\n", ' ')
      #{kafka_messages.count} messages
      on #{topic} topic
      delegated to #{consumer.class}
    MSG
  )
end

#on_connection_client_fetch_loop_error(event) ⇒ Object

Note:

Karafka will attempt to reconnect, so an error not a fatal

Logs errors that are related to the connection itself

Parameters:

  • event (Dry::Events::Event)

    event details including payload



70
71
72
# File 'lib/karafka/instrumentation/stdout_listener.rb', line 70

def on_connection_client_fetch_loop_error(event)
  error "Client fetch loop error: #{event[:error]}"
end

#on_connection_listener_fetch_loop_error(event) ⇒ Object

Note:

It’s an error as we can recover from it not a fatal

Logs errors that occurred in a listener fetch loop

Parameters:

  • event (Dry::Events::Event)

    event details including payload



63
64
65
# File 'lib/karafka/instrumentation/stdout_listener.rb', line 63

def on_connection_listener_fetch_loop_error(event)
  error "Listener fetch loop error: #{event[:error]}"
end

#on_connection_message_delegator_call(event) ⇒ Object

Logs details about incoming message and with which consumer we will consume it

Parameters:

  • event (Dry::Events::Event)

    event details including payload



33
34
35
36
37
# File 'lib/karafka/instrumentation/stdout_listener.rb', line 33

def on_connection_message_delegator_call(event)
  consumer = event[:consumer]
  topic = consumer.topic.name
  info "1 message on #{topic} topic delegated to #{consumer.class}"
end

#on_consumers_responders_respond_with(event) ⇒ Object

Logs info about responder usage withing a controller flow

Parameters:

  • event (Dry::Events::Event)

    event details including payload



99
100
101
102
103
104
# File 'lib/karafka/instrumentation/stdout_listener.rb', line 99

def on_consumers_responders_respond_with(event)
  calling = event[:caller]
  responder = calling.topic.responder
  data = event[:data]
  info "Responded from #{calling.class} using #{responder} with following data #{data}"
end

#on_fetcher_call_error(event) ⇒ Object

Note:

If this happens, Karafka will shutdown as it means a critical error in one of the threads

Logs info about crashed fetcher

Parameters:

  • event (Dry::Events::Event)

    event details including payload



78
79
80
# File 'lib/karafka/instrumentation/stdout_listener.rb', line 78

def on_fetcher_call_error(event)
  fatal "Fetcher crash due to an error: #{event[:error]}"
end

#on_params_params_deserialize(event) ⇒ Object

Logs details about each received message value deserialization

Parameters:

  • event (Dry::Events::Event)

    event details including payload



41
42
43
44
45
46
47
48
49
50
# File 'lib/karafka/instrumentation/stdout_listener.rb', line 41

def on_params_params_deserialize(event)
  # Keep in mind, that a caller here is a param object not a controller,
  # so it returns a topic as a string, not a routing topic
  debug(
    <<~MSG.chomp.tr("\n", ' ')
      Params deserialization for #{event[:caller]..topic} topic
      successful in #{event[:time]} ms
    MSG
  )
end

#on_params_params_deserialize_error(event) ⇒ Object

Logs unsuccessful deserialization attempts of incoming data

Parameters:

  • event (Dry::Events::Event)

    event details including payload



54
55
56
57
58
# File 'lib/karafka/instrumentation/stdout_listener.rb', line 54

def on_params_params_deserialize_error(event)
  topic = event[:caller]..topic
  error = event[:error]
  error "Params deserialization error for #{topic} topic: #{error}"
end

#on_process_notice_signal(event) ⇒ Object

Logs info about system signals that Karafka received

Parameters:

  • event (Dry::Events::Event)

    event details including payload



93
94
95
# File 'lib/karafka/instrumentation/stdout_listener.rb', line 93

def on_process_notice_signal(event)
  info "Received #{event[:signal]} system signal"
end