Class: Tasker::Events::Subscribers::TelemetrySubscriber

Inherits:
BaseSubscriber
  • Object
show all
Defined in:
lib/tasker/events/subscribers/telemetry_subscriber.rb

Overview

TelemetrySubscriber handles telemetry events for observability

This subscriber creates OpenTelemetry spans with proper hierarchical context for distributed tracing in systems like Jaeger. It follows OpenTelemetry best practices by creating detailed spans for debugging while allowing metrics to be derived from span data.

Architecture Decision:

  • SPANS: Individual trace records for detailed debugging (this class)

  • METRICS: Aggregated data for dashboards/alerts (derived from spans or separate collectors)

Constant Summary collapse

EVENT_ANNOTATION_MAP =

Event type to annotation name mapping

{
  'initialize_requested' => 'task.initialize',
  'execution_requested' => 'step.queued',
  'retry_requested' => 'step.retry'
}.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from BaseSubscriber

filter_events, subscribe, subscribe_to, #subscribe_to_publisher

Constructor Details

#initializeTelemetrySubscriber

Returns a new instance of TelemetrySubscriber.



32
33
34
35
# File 'lib/tasker/events/subscribers/telemetry_subscriber.rb', line 32

def initialize
  super
  @tracer = create_tracer
end

Instance Attribute Details

#tracerObject

Returns the value of attribute tracer.



20
21
22
# File 'lib/tasker/events/subscribers/telemetry_subscriber.rb', line 20

def tracer
  @tracer
end

Instance Method Details

#convert_attributes_for_otel(attributes) ⇒ Hash

Convert event attributes to OpenTelemetry format

Parameters:

  • attributes (Hash)

    The attributes to convert

Returns:

  • (Hash)

    OpenTelemetry-compatible attributes



310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
# File 'lib/tasker/events/subscribers/telemetry_subscriber.rb', line 310

def convert_attributes_for_otel(attributes)
  result = {}
  config = Tasker::Configuration.configuration
  service_name = config.telemetry.service_name

  # Filter sensitive data first
  filtered_attributes = filter_sensitive_attributes(attributes)

  # Ensure attributes are properly formatted for OpenTelemetry
  filtered_attributes.each do |key, value|
    # Skip exception_object as it can't be properly serialized
    next if key == :exception_object

    # Use the configured service name as prefix for all attributes
    attr_prefix = "#{service_name}."
    attr_key = key.to_s.start_with?(attr_prefix) ? key.to_s : "#{attr_prefix}#{key}"

    # Convert values based on their type
    result[attr_key] = convert_value_for_otel(value)
  end

  result
end

#create_simple_span(event, span_name, attributes) ⇒ Object

Create a simple span for events that don’t need complex hierarchy

Parameters:

  • event (Hash)

    The event data

  • span_name (String)

    The name for the span

  • attributes (Hash)

    Span attributes



174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/tasker/events/subscribers/telemetry_subscriber.rb', line 174

def create_simple_span(event, span_name, attributes)
  return unless opentelemetry_available?

  otel_attributes = convert_attributes_for_otel(attributes)

  tracer.in_span(span_name, attributes: otel_attributes) do |span|
    # Add event annotation
    span.add_event(event_to_annotation_name(event), attributes: otel_attributes)
  end
rescue StandardError => e
  Rails.logger.warn("Failed to create simple span: #{e.message}")
end

#create_step_span(event, span_name, attributes, status) ⇒ Object

Create a step span as a child of the task span

Parameters:

  • event (Hash)

    The event data

  • span_name (String)

    The name for the span

  • attributes (Hash)

    Span attributes

  • status (Symbol)

    The span status (:ok or :error)



245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
# File 'lib/tasker/events/subscribers/telemetry_subscriber.rb', line 245

def create_step_span(event, span_name, attributes, status)
  return unless opentelemetry_available?

  task_id = safe_get(event, :task_id)
  step_id = safe_get(event, :step_id)
  return unless task_id && step_id

  task_span = get_task_span(task_id)
  return unless task_span

  otel_attributes = convert_attributes_for_otel(attributes)

  # Create child span context
  span_context = ::OpenTelemetry::Trace.context_with_span(task_span)

  ::OpenTelemetry::Context.with_current(span_context) do
    tracer.in_span(span_name, attributes: otel_attributes) do |step_span|
      # Add step event
      event_name = status == :error ? 'step.failed' : 'step.completed'
      step_span.add_event(event_name, attributes: otel_attributes)

      # Set span status
      set_span_status(step_span, status, attributes)
    end
  end
rescue StandardError => e
  Rails.logger.warn("Failed to create step span: #{e.message}")
end

#create_task_span(event, span_name, attributes) ⇒ Object

Create a root span for a task and store it for child spans

Parameters:

  • event (Hash)

    The event data

  • span_name (String)

    The name for the span

  • attributes (Hash)

    Span attributes



192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/tasker/events/subscribers/telemetry_subscriber.rb', line 192

def create_task_span(event, span_name, attributes)
  return unless opentelemetry_available?

  task_id = safe_get(event, :task_id)
  return unless task_id

  otel_attributes = convert_attributes_for_otel(attributes)

  span = tracer.start_root_span(span_name, attributes: otel_attributes)
  store_task_span(task_id, span)

  # Add an event to mark the task start
  span.add_event('task.started', attributes: otel_attributes)
rescue StandardError => e
  Rails.logger.warn("Failed to create task span: #{e.message}")
end

#create_tracerOpenTelemetry::Tracer

Create the OpenTelemetry tracer

Returns:

  • (OpenTelemetry::Tracer)

    The tracer instance



298
299
300
301
302
303
304
# File 'lib/tasker/events/subscribers/telemetry_subscriber.rb', line 298

def create_tracer
  config = Tasker::Configuration.configuration
  ::OpenTelemetry.tracer_provider.tracer(
    config.telemetry.service_name,
    config.telemetry.service_version
  )
end

#event_to_annotation_name(event) ⇒ Object

Convert event to annotation name



282
283
284
285
286
# File 'lib/tasker/events/subscribers/telemetry_subscriber.rb', line 282

def event_to_annotation_name(event)
  # Simple mapping from event to annotation
  event_type = safe_get(event, :event_type)
  EVENT_ANNOTATION_MAP.fetch(event_type, 'event.processed')
end

#extract_step_attributes(event) ⇒ Object

Extract step-specific attributes (enhanced from BaseSubscriber)



158
159
160
161
162
# File 'lib/tasker/events/subscribers/telemetry_subscriber.rb', line 158

def extract_step_attributes(event)
  super.merge(
    step_name: safe_get(event, :step_name, 'unknown_step')
  )
end

#finish_task_span(event, status, attributes) ⇒ Object

Finish a task span with the appropriate status

Parameters:

  • event (Hash)

    The event data

  • status (Symbol)

    The span status (:ok or :error)

  • attributes (Hash)

    Final span attributes



214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
# File 'lib/tasker/events/subscribers/telemetry_subscriber.rb', line 214

def finish_task_span(event, status, attributes)
  return unless opentelemetry_available?

  task_id = safe_get(event, :task_id)
  return unless task_id

  span = get_task_span(task_id)
  return unless span

  otel_attributes = convert_attributes_for_otel(attributes)

  # Add completion event
  event_name = status == :error ? 'task.failed' : 'task.completed'
  span.add_event(event_name, attributes: otel_attributes)

  # Set span status
  set_span_status(span, status, attributes)

  # Finish the span
  span.finish
  remove_task_span(task_id)
rescue StandardError => e
  Rails.logger.warn("Failed to finish task span: #{e.message}")
end

#handle_step_completed(event) ⇒ Object

Handle step completion events



106
107
108
109
110
111
112
113
114
115
116
# File 'lib/tasker/events/subscribers/telemetry_subscriber.rb', line 106

def handle_step_completed(event)
  return unless should_process_event?(Tasker::Constants::StepEvents::COMPLETED)

  attributes = extract_step_attributes(event).merge(
    execution_duration: safe_get(event, :execution_duration, 0.0),
    attempt_number: safe_get(event, :attempt_number, 1)
  )

  # Create step span as child of task span
  create_step_span(event, 'tasker.step.execution', attributes, :ok)
end

#handle_step_execution_requested(event) ⇒ Object

Handle step execution requested events



94
95
96
97
98
99
100
101
102
103
# File 'lib/tasker/events/subscribers/telemetry_subscriber.rb', line 94

def handle_step_execution_requested(event)
  return unless should_process_event?(Tasker::Constants::StepEvents::EXECUTION_REQUESTED)

  attributes = extract_step_attributes(event).merge(
    attempt_number: safe_get(event, :attempt_number, 1)
  )

  # Create a simple span for step queuing
  create_simple_span(event, 'tasker.step.queued', attributes)
end

#handle_step_failed(event) ⇒ Object

Handle step failure events



119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/tasker/events/subscribers/telemetry_subscriber.rb', line 119

def handle_step_failed(event)
  return unless should_process_event?(Tasker::Constants::StepEvents::FAILED)

  attributes = extract_step_attributes(event).merge(
    error: safe_get(event, :error_message, safe_get(event, :error, 'Unknown error')),
    attempt_number: safe_get(event, :attempt_number, 1),
    exception_class: safe_get(event, :exception_class, 'StandardError'),
    retry_limit: safe_get(event, :retry_limit, 3)
  )

  # Create step span as child of task span with error status
  create_step_span(event, 'tasker.step.execution', attributes, :error)
end

#handle_step_retry_requested(event) ⇒ Object

Handle step retry events



134
135
136
137
138
139
140
141
142
143
144
# File 'lib/tasker/events/subscribers/telemetry_subscriber.rb', line 134

def handle_step_retry_requested(event)
  return unless should_process_event?(Tasker::Constants::StepEvents::RETRY_REQUESTED)

  attributes = extract_step_attributes(event).merge(
    attempt_number: safe_get(event, :attempt_number, 1),
    retry_limit: safe_get(event, :retry_limit, 3)
  )

  # Create a simple span for retry events
  create_simple_span(event, 'tasker.step.retry', attributes)
end

#handle_task_completed(event) ⇒ Object

Handle task completion events



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/tasker/events/subscribers/telemetry_subscriber.rb', line 62

def handle_task_completed(event)
  return unless should_process_event?(Tasker::Constants::TaskEvents::COMPLETED)

  attributes = extract_core_attributes(event).merge(
    task_name: safe_get(event, :task_name, 'unknown_task'),
    duration: calculate_duration(event),
    total_execution_duration: safe_get(event, :total_execution_duration, 0.0),
    current_execution_duration: safe_get(event, :current_execution_duration, 0.0),
    total_steps: safe_get(event, :total_steps, 0),
    completed_steps: safe_get(event, :completed_steps, 0)
  )

  # Finish the task span with success status
  finish_task_span(event, :ok, attributes)
end

#handle_task_failed(event) ⇒ Object

Handle task failure events



79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/tasker/events/subscribers/telemetry_subscriber.rb', line 79

def handle_task_failed(event)
  return unless should_process_event?(Tasker::Constants::TaskEvents::FAILED)

  attributes = extract_core_attributes(event).merge(
    task_name: safe_get(event, :task_name, 'unknown_task'),
    error: safe_get(event, :error_message, safe_get(event, :error, 'Unknown error')),
    exception_class: safe_get(event, :exception_class, 'StandardError'),
    failed_steps: safe_get(event, :failed_steps, 0)
  )

  # Finish the task span with error status
  finish_task_span(event, :error, attributes)
end

#handle_task_initialize_requested(event) ⇒ Object

Handle task initialization events



38
39
40
41
42
43
44
45
46
47
# File 'lib/tasker/events/subscribers/telemetry_subscriber.rb', line 38

def handle_task_initialize_requested(event)
  return unless should_process_event?(Tasker::Constants::TaskEvents::INITIALIZE_REQUESTED)

  attributes = extract_core_attributes(event).merge(
    task_name: safe_get(event, :task_name, 'unknown_task')
  )

  # Only create basic span for initialization - task.start_requested will create the main span
  create_simple_span(event, 'tasker.task.initialize', attributes)
end

#handle_task_start_requested(event) ⇒ Object

Handle task start events



50
51
52
53
54
55
56
57
58
59
# File 'lib/tasker/events/subscribers/telemetry_subscriber.rb', line 50

def handle_task_start_requested(event)
  return unless should_process_event?(Tasker::Constants::TaskEvents::START_REQUESTED)

  attributes = extract_core_attributes(event).merge(
    task_name: safe_get(event, :task_name, 'unknown_task')
  )

  # Create root span for task and store it for child spans
  create_task_span(event, 'tasker.task.execution', attributes)
end

#opentelemetry_available?Boolean

Check if OpenTelemetry is available and configured

Returns:

  • (Boolean)

    True if OpenTelemetry is available



291
292
293
# File 'lib/tasker/events/subscribers/telemetry_subscriber.rb', line 291

def opentelemetry_available?
  defined?(::OpenTelemetry) && ::OpenTelemetry.tracer_provider
end

#should_process_event?(event_constant) ⇒ Boolean

Override BaseSubscriber to add telemetry-specific filtering

Returns:

  • (Boolean)


147
148
149
150
151
152
153
154
155
# File 'lib/tasker/events/subscribers/telemetry_subscriber.rb', line 147

def should_process_event?(event_constant)
  # Get configuration once for efficiency
  config = Tasker::Configuration.configuration

  # Only process if telemetry is enabled
  return false unless config.telemetry.enabled

  super
end

#telemetry_enabled?Boolean

Check if telemetry is enabled

Returns:

  • (Boolean)


165
166
167
# File 'lib/tasker/events/subscribers/telemetry_subscriber.rb', line 165

def telemetry_enabled?
  Tasker::Configuration.configuration.telemetry.enabled != false
end