Class: Karafka::Instrumentation::LoggerListener

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/instrumentation/logger_listener.rb

Overview

Default listener that hooks up to our instrumentation and uses its events for logging It can be removed/replaced or anything without any harm to the Karafka app flow.

Instance Method Summary collapse

Constructor Details

#initialize(log_polling: true) ⇒ LoggerListener

Returns a new instance of LoggerListener.

Parameters:

  • log_polling (Boolean) (defaults to: true)

    should we log the fact that messages are being polled. This is usually noisy and not useful in production but can be useful in dev. While users can do this themselves this has been requested and asked for often, thus similar to how extensive logging can be disabled in WaterDrop, we do it here as well.



23
24
25
# File 'lib/karafka/instrumentation/logger_listener.rb', line 23

def initialize(log_polling: true)
  @log_polling = log_polling
end

Instance Method Details

#on_app_quiet(event) ⇒ Object

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



204
205
206
# File 'lib/karafka/instrumentation/logger_listener.rb', line 204

def on_app_quiet(event)
  info "[#{event[:server_id]}] Reached quiet mode. No messages will be processed anymore"
end

#on_app_quieting(event) ⇒ Object

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



199
200
201
# File 'lib/karafka/instrumentation/logger_listener.rb', line 199

def on_app_quieting(event)
  info "[#{event[:server_id]}] Switching to quiet mode. New messages will not be processed"
end

#on_app_running(event) ⇒ Object

Logs info that we’re running Karafka app.

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



187
188
189
190
191
192
193
194
195
196
# File 'lib/karafka/instrumentation/logger_listener.rb', line 187

def on_app_running(event)
  server_id = event[:server_id]

  info "[#{server_id}] Running in #{RUBY_DESCRIPTION}"
  info "[#{server_id}] Running Karafka #{Karafka::VERSION} server"

  return if Karafka.pro?

  info "[#{server_id}] See LICENSE and the LGPL-3.0 for licensing details"
end

#on_app_stopped(event) ⇒ Object

Logs info that we stopped the Karafka server.

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



218
219
220
# File 'lib/karafka/instrumentation/logger_listener.rb', line 218

def on_app_stopped(event)
  info "[#{event[:server_id]}] Stopped Karafka server"
end

#on_app_stopping(event) ⇒ Object

Logs info that we’re going to stop the Karafka server.

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



211
212
213
# File 'lib/karafka/instrumentation/logger_listener.rb', line 211

def on_app_stopping(event)
  info "[#{event[:server_id]}] Stopping Karafka server"
end

#on_client_pause(event) ⇒ Object

Note:

There may be no offset provided in case user wants to pause on the consecutive offset position. This can be beneficial when not wanting to purge the buffers.

Prints info about a consumer pause occurrence. Irrelevant if user or system initiated.

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/karafka/instrumentation/logger_listener.rb', line 101

def on_client_pause(event)
  topic = event[:topic]
  partition = event[:partition]
  offset = event[:offset]
  client = event[:caller]

  info "    [\#{client.id}]\n    Pausing on topic \#{topic}-\#{partition}\n    on \#{offset ? \"offset \#{offset}\" : 'the consecutive offset'}\n  MSG\nend\n".tr("\n", ' ').strip!

#on_client_resume(event) ⇒ Object

Prints information about resuming of processing of a given topic partition

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



117
118
119
120
121
122
123
124
125
# File 'lib/karafka/instrumentation/logger_listener.rb', line 117

def on_client_resume(event)
  topic = event[:topic]
  partition = event[:partition]
  client = event[:caller]

  info "    [\#{client.id}] Resuming on topic \#{topic}-\#{partition}\n  MSG\nend\n".tr("\n", ' ').strip!

#on_connection_listener_before_fetch_loop(event) ⇒ Object

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



28
29
30
31
32
33
34
35
36
37
38
# File 'lib/karafka/instrumentation/logger_listener.rb', line 28

def on_connection_listener_before_fetch_loop(event)
  listener_id = event[:caller].id
  subscription_group = event[:subscription_group]
  consumer_group_id = subscription_group.consumer_group.id
  topics = subscription_group.topics.select(&:active?).map(&:name).join(', ')
  group_details = "#{consumer_group_id}/#{subscription_group.id}"

  info(
    "[#{listener_id}] Group #{group_details} subscribing to topics: #{topics}"
  )
end

#on_connection_listener_fetch_loop(event) ⇒ Object

Logs each messages fetching attempt

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



43
44
45
46
47
48
# File 'lib/karafka/instrumentation/logger_listener.rb', line 43

def on_connection_listener_fetch_loop(event)
  return unless log_polling?

  listener_id = event[:caller].id
  debug "[#{listener_id}] Polling messages..."
end

#on_connection_listener_fetch_loop_received(event) ⇒ Object

Logs about messages that we’ve received from Kafka

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/karafka/instrumentation/logger_listener.rb', line 53

def on_connection_listener_fetch_loop_received(event)
  return unless log_polling?

  listener_id = event[:caller].id
  time = event[:time].round(2)
  messages_count = event[:messages_buffer].size

  message = "[#{listener_id}] Polled #{messages_count} messages in #{time}ms"

  # We don't want the "polled 0" in dev as it would spam the log
  # Instead we publish only info when there was anything we could poll and fail over to the
  # zero notifications when in debug mode
  messages_count.zero? ? debug(message) : info(message)
end

#on_consumer_consuming_retry(event) ⇒ Object

Prints info about retry of processing after an error

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/karafka/instrumentation/logger_listener.rb', line 130

def on_consumer_consuming_retry(event)
  topic = event[:topic]
  partition = event[:partition]
  offset = event[:offset]
  consumer = event[:caller]
  timeout = event[:timeout]

  info "    [\#{consumer.id}] Retrying of \#{consumer.class} after \#{timeout} ms\n    on topic \#{topic}-\#{partition} from offset \#{offset}\n  MSG\nend\n".tr("\n", ' ').strip!

#on_consumer_consuming_seek(event) ⇒ Object

Prints info about seeking to a particular location

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



146
147
148
149
150
151
152
153
154
155
156
# File 'lib/karafka/instrumentation/logger_listener.rb', line 146

def on_consumer_consuming_seek(event)
  topic = event[:topic]
  partition = event[:partition]
  seek_offset = event[:message].offset
  consumer = event[:caller]

  info "    [\#{consumer.id}] Seeking from \#{consumer.class}\n    on topic \#{topic}-\#{partition} to offset \#{seek_offset}\n  MSG\nend\n".tr("\n", ' ').strip!

#on_dead_letter_queue_dispatched(event) ⇒ Object

Logs info when we have dispatched a message the the DLQ

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



261
262
263
264
265
266
267
268
269
270
271
272
273
274
# File 'lib/karafka/instrumentation/logger_listener.rb', line 261

def on_dead_letter_queue_dispatched(event)
  consumer = event[:caller]
  topic = consumer.topic.name
  message = event[:message]
  offset = message.offset
  dlq_topic = consumer.topic.dead_letter_queue.topic
  partition = message.partition

  info "    [\#{consumer.id}] Dispatched message \#{offset}\n    from \#{topic}-\#{partition}\n    to DLQ topic: \#{dlq_topic}\n  MSG\nend\n".tr("\n", ' ').strip!

#on_error_occurred(event) ⇒ Object

There are many types of errors that can occur in many places, but we provide a single handler for all of them to simplify error instrumentation.

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
# File 'lib/karafka/instrumentation/logger_listener.rb', line 340

def on_error_occurred(event)
  type = event[:type]
  error = event[:error]
  backtrace = (error.backtrace || []).join("\n")

  details = [error.to_s, error_details(event)].compact.join(' ')

  case type
  when 'consumer.initialized.error'
    error "Consumer initialized error: #{details}"
    error backtrace
  when 'consumer.wrap.error'
    error "Consumer wrap failed due to an error: #{details}"
    error backtrace
  when 'consumer.consume.error'
    error "Consumer consuming error: #{details}"
    error backtrace
  when 'consumer.revoked.error'
    error "Consumer on revoked failed due to an error: #{details}"
    error backtrace
  when 'consumer.idle.error'
    error "Consumer idle failed due to an error: #{details}"
    error backtrace
  when 'consumer.shutdown.error'
    error "Consumer on shutdown failed due to an error: #{details}"
    error backtrace
  when 'consumer.tick.error'
    error "Consumer on tick failed due to an error: #{details}"
    error backtrace
  when 'consumer.eofed.error'
    error "Consumer on eofed failed due to an error: #{details}"
    error backtrace
  when 'consumer.after_consume.error'
    error "Consumer on after_consume failed due to an error: #{details}"
    error backtrace
  when 'worker.process.error'
    fatal "Worker processing failed due to an error: #{details}"
    fatal backtrace
  when 'connection.listener.fetch_loop.error'
    error "Listener fetch loop error: #{details}"
    error backtrace
  when 'swarm.supervisor.error'
    fatal "Swarm supervisor crashed due to an error: #{details}"
    fatal backtrace
  when 'runner.call.error'
    fatal "Runner crashed due to an error: #{details}"
    fatal backtrace
  when 'app.stopping.error'
    # Counts number of workers and listeners that were still active when forcing the
    # shutdown. Please note, that unless all listeners are closed, workers will not finalize
    # their operations as well.
    # We need to check if listeners and workers are assigned as during super early stages of
    # boot they are not.
    listeners = Server.listeners ? Server.listeners.count(&:active?) : 0
    workers = Server.workers ? Server.workers.count(&:alive?) : 0

    message = "      Forceful Karafka server stop with:\n      \#{workers} active workers and\n      \#{listeners} active listeners\n    MSG\n\n    error message\n  when 'app.forceful_stopping.error'\n    error \"Forceful shutdown error occurred: \#{details}\"\n    error backtrace\n  when 'librdkafka.error'\n    error \"librdkafka internal error occurred: \#{details}\"\n    error backtrace\n  # Those can occur when emitted statistics are consumed by the end user and the processing\n  # of statistics fails. The statistics are emitted from librdkafka main loop thread and\n  # any errors there crash the whole thread\n  when 'callbacks.statistics.error'\n    error \"callbacks.statistics processing failed due to an error: \#{details}\"\n    error backtrace\n  when 'callbacks.error.error'\n    error \"callbacks.error processing failed due to an error: \#{details}\"\n    error backtrace\n  # Those will only occur when retries in the client fail and when they did not stop after\n  # back-offs\n  when 'connection.client.poll.error'\n    error \"Data polling error occurred: \#{details}\"\n    error backtrace\n  when 'connection.client.rebalance_callback.error'\n    error \"Rebalance callback error occurred: \#{details}\"\n    error backtrace\n  when 'connection.client.unsubscribe.error'\n    error \"Client unsubscribe error occurred: \#{details}\"\n    error backtrace\n  when 'parallel_segments.reducer.error'\n    error \"Parallel segments reducer error occurred: \#{details}\"\n    error backtrace\n  when 'parallel_segments.partitioner.error'\n    error \"Parallel segments partitioner error occurred: \#{details}\"\n    error backtrace\n  when 'virtual_partitions.partitioner.error'\n    error \"Virtual partitions partitioner error occurred: \#{details}\"\n    error backtrace\n  # This handles any custom errors coming from places like Web-UI, etc\n  else\n    error \"\#{type} error occurred: \#{error.class} - \#{details}\"\n    error backtrace\n  end\nend\n".tr("\n", ' ').strip!

#on_filtering_seek(event) ⇒ Object

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



295
296
297
298
299
300
301
302
303
304
305
306
307
# File 'lib/karafka/instrumentation/logger_listener.rb', line 295

def on_filtering_seek(event)
  consumer = event[:caller]
  topic = consumer.topic.name
  # Message to which we seek
  message = event[:message]
  partition = message.partition
  offset = message.offset

  info "    [\#{consumer.id}] Post-filtering seeking to message \#{offset}\n    on \#{topic}-\#{partition}\n  MSG\nend\n".tr("\n", ' ').strip!

#on_filtering_throttled(event) ⇒ Object

Logs info about throttling event

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



279
280
281
282
283
284
285
286
287
288
289
290
291
292
# File 'lib/karafka/instrumentation/logger_listener.rb', line 279

def on_filtering_throttled(event)
  consumer = event[:caller]
  topic = consumer.topic.name
  # Here we get last message before throttle
  message = event[:message]
  partition = message.partition
  offset = message.offset

  info "    [\#{consumer.id}] Throttled and will resume\n    from message \#{offset}\n    on \#{topic}-\#{partition}\n  MSG\nend\n".tr("\n", ' ').strip!

#on_process_notice_signal(event) ⇒ Object

Logs info about system signals that Karafka received and prints backtrace for threads in case of ttin

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/karafka/instrumentation/logger_listener.rb', line 162

def on_process_notice_signal(event)
  server_id = Karafka::Server.id
  info "[#{server_id}] Received #{event[:signal]} system signal"

  # We print backtrace only for ttin
  return unless event[:signal] == :SIGTTIN

  # Inspired by Sidekiq
  Thread.list.each do |thread|
    tid = (thread.object_id ^ ::Process.pid).to_s(36)

    warn ''
    warn "Thread TID-#{tid} #{thread.name}"

    if thread.backtrace && !thread.backtrace.empty?
      warn thread.backtrace.join("\n")
    else
      warn '<no backtrace available>'
    end
  end
end

#on_rebalance_partitions_assigned(event) ⇒ Object

Logs info about partitions that we’ve gained

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details with assigned partitions



243
244
245
246
247
248
249
250
251
252
253
254
255
256
# File 'lib/karafka/instrumentation/logger_listener.rb', line 243

def on_rebalance_partitions_assigned(event)
  assigned_partitions = event[:tpl].to_h.transform_values { |part| part.map(&:partition) }
  group_id = event[:consumer_group_id]
  client_id = event[:client_id]
  group_prefix = "[#{client_id}] Group #{group_id} rebalance"

  if assigned_partitions.empty?
    info "#{group_prefix}: No partitions assigned"
  else
    assigned_partitions.each do |topic, partitions|
      info "#{group_prefix}: #{topic}-[#{partitions.join(',')}] assigned"
    end
  end
end

#on_rebalance_partitions_revoked(event) ⇒ Object

Logs info about partitions we have lost

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details with revoked partitions



225
226
227
228
229
230
231
232
233
234
235
236
237
238
# File 'lib/karafka/instrumentation/logger_listener.rb', line 225

def on_rebalance_partitions_revoked(event)
  revoked_partitions = event[:tpl].to_h.transform_values { |part| part.map(&:partition) }
  group_id = event[:consumer_group_id]
  client_id = event[:client_id]
  group_prefix = "[#{client_id}] Group #{group_id} rebalance"

  if revoked_partitions.empty?
    info "#{group_prefix}: No partitions revoked"
  else
    revoked_partitions.each do |topic, partitions|
      info "#{group_prefix}: #{topic}-[#{partitions.join(',')}] revoked"
    end
  end
end

#on_swarm_manager_before_fork(event) ⇒ Object

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



322
323
324
# File 'lib/karafka/instrumentation/logger_listener.rb', line 322

def on_swarm_manager_before_fork(event)
  debug "Swarm manager starting node with id: #{event[:node].id}"
end

#on_swarm_manager_control(event) ⇒ Object

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



332
333
334
335
# File 'lib/karafka/instrumentation/logger_listener.rb', line 332

def on_swarm_manager_control(event)
  pids = event[:caller].nodes.map(&:pid).join(', ')
  debug "Swarm manager checking nodes: #{pids}"
end

#on_swarm_manager_stopping(event) ⇒ Object

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



310
311
312
313
# File 'lib/karafka/instrumentation/logger_listener.rb', line 310

def on_swarm_manager_stopping(event)
  node = event[:node]
  error "Swarm manager detected unhealthy node #{node.pid}. Sending TERM signal..."
end

#on_swarm_manager_terminating(event) ⇒ Object

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



316
317
318
319
# File 'lib/karafka/instrumentation/logger_listener.rb', line 316

def on_swarm_manager_terminating(event)
  node = event[:node]
  error "Swarm manager detected unresponsive node #{node.pid}. Sending KILL signal..."
end

#on_swarm_node_after_fork(_event) ⇒ Object

Parameters:

  • _event (Karafka::Core::Monitoring::Event)

    event details including payload



327
328
329
# File 'lib/karafka/instrumentation/logger_listener.rb', line 327

def on_swarm_node_after_fork(_event)
  info "Swarm node #{::Process.pid} forked from #{::Process.ppid}"
end

#on_worker_process(event) ⇒ Object

Prints info about the fact that a given job has started

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



71
72
73
74
75
76
77
78
# File 'lib/karafka/instrumentation/logger_listener.rb', line 71

def on_worker_process(event)
  job = event[:job]
  job_type = job.class.to_s.split('::').last
  consumer = job.executor.topic.consumer
  topic = job.executor.topic.name
  partition = job.executor.partition
  info "[#{job.id}] #{job_type} job for #{consumer} on #{topic}-#{partition} started"
end

#on_worker_processed(event) ⇒ Object

Prints info about the fact that a given job has finished

Parameters:

  • event (Karafka::Core::Monitoring::Event)

    event details including payload



83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/karafka/instrumentation/logger_listener.rb', line 83

def on_worker_processed(event)
  job = event[:job]
  time = event[:time].round(2)
  job_type = job.class.to_s.split('::').last
  consumer = job.executor.topic.consumer
  topic = job.executor.topic.name
  partition = job.executor.partition
  info "    [\#{job.id}] \#{job_type} job for \#{consumer}\n    on \#{topic}-\#{partition} finished in \#{time} ms\n  MSG\nend\n".tr("\n", ' ').strip!