Class: Tasker::Events::Subscribers::TelemetrySubscriber
- Inherits:
-
BaseSubscriber
- Object
- BaseSubscriber
- Tasker::Events::Subscribers::TelemetrySubscriber
- Defined in:
- lib/tasker/events/subscribers/telemetry_subscriber.rb
Overview
TelemetrySubscriber handles telemetry events for observability
This subscriber creates OpenTelemetry spans with proper hierarchical context for distributed tracing in systems like Jaeger. It follows OpenTelemetry best practices by creating detailed spans for debugging while allowing metrics to be derived from span data.
Architecture Decision:
-
SPANS: Individual trace records for detailed debugging (this class)
-
METRICS: Aggregated data for dashboards/alerts (derived from spans or separate collectors)
Constant Summary collapse
- EVENT_ANNOTATION_MAP =
Event type to annotation name mapping
{ 'initialize_requested' => 'task.initialize', 'execution_requested' => 'step.queued', 'retry_requested' => 'step.retry' }.freeze
Instance Attribute Summary collapse
-
#tracer ⇒ Object
Returns the value of attribute tracer.
Instance Method Summary collapse
-
#convert_attributes_for_otel(attributes) ⇒ Hash
Convert event attributes to OpenTelemetry format.
-
#create_simple_span(event, span_name, attributes) ⇒ Object
Create a simple span for events that don’t need complex hierarchy.
-
#create_step_span(event, span_name, attributes, status) ⇒ Object
Create a step span as a child of the task span.
-
#create_task_span(event, span_name, attributes) ⇒ Object
Create a root span for a task and store it for child spans.
-
#create_tracer ⇒ OpenTelemetry::Tracer
Create the OpenTelemetry tracer.
-
#event_to_annotation_name(event) ⇒ Object
Convert event to annotation name.
-
#extract_step_attributes(event) ⇒ Object
Extract step-specific attributes (enhanced from BaseSubscriber).
-
#finish_task_span(event, status, attributes) ⇒ Object
Finish a task span with the appropriate status.
-
#handle_step_completed(event) ⇒ Object
Handle step completion events.
-
#handle_step_execution_requested(event) ⇒ Object
Handle step execution requested events.
-
#handle_step_failed(event) ⇒ Object
Handle step failure events.
-
#handle_step_retry_requested(event) ⇒ Object
Handle step retry events.
-
#handle_task_completed(event) ⇒ Object
Handle task completion events.
-
#handle_task_failed(event) ⇒ Object
Handle task failure events.
-
#handle_task_initialize_requested(event) ⇒ Object
Handle task initialization events.
-
#handle_task_start_requested(event) ⇒ Object
Handle task start events.
-
#initialize ⇒ TelemetrySubscriber
constructor
A new instance of TelemetrySubscriber.
-
#opentelemetry_available? ⇒ Boolean
Check if OpenTelemetry is available and configured.
-
#should_process_event?(event_constant) ⇒ Boolean
Override BaseSubscriber to add telemetry-specific filtering.
-
#telemetry_enabled? ⇒ Boolean
Check if telemetry is enabled.
Methods inherited from BaseSubscriber
filter_events, subscribe, subscribe_to, #subscribe_to_publisher
Constructor Details
#initialize ⇒ TelemetrySubscriber
Returns a new instance of TelemetrySubscriber.
32 33 34 35 |
# File 'lib/tasker/events/subscribers/telemetry_subscriber.rb', line 32 def initialize super @tracer = create_tracer end |
Instance Attribute Details
#tracer ⇒ Object
Returns the value of attribute tracer.
20 21 22 |
# File 'lib/tasker/events/subscribers/telemetry_subscriber.rb', line 20 def tracer @tracer end |
Instance Method Details
#convert_attributes_for_otel(attributes) ⇒ Hash
Convert event attributes to OpenTelemetry format
310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 |
# File 'lib/tasker/events/subscribers/telemetry_subscriber.rb', line 310 def convert_attributes_for_otel(attributes) result = {} config = Tasker::Configuration.configuration service_name = config.telemetry.service_name # Filter sensitive data first filtered_attributes = filter_sensitive_attributes(attributes) # Ensure attributes are properly formatted for OpenTelemetry filtered_attributes.each do |key, value| # Skip exception_object as it can't be properly serialized next if key == :exception_object # Use the configured service name as prefix for all attributes attr_prefix = "#{service_name}." attr_key = key.to_s.start_with?(attr_prefix) ? key.to_s : "#{attr_prefix}#{key}" # Convert values based on their type result[attr_key] = convert_value_for_otel(value) end result end |
#create_simple_span(event, span_name, attributes) ⇒ Object
Create a simple span for events that don’t need complex hierarchy
174 175 176 177 178 179 180 181 182 183 184 185 |
# File 'lib/tasker/events/subscribers/telemetry_subscriber.rb', line 174 def create_simple_span(event, span_name, attributes) return unless opentelemetry_available? otel_attributes = convert_attributes_for_otel(attributes) tracer.in_span(span_name, attributes: otel_attributes) do |span| # Add event annotation span.add_event(event_to_annotation_name(event), attributes: otel_attributes) end rescue StandardError => e Rails.logger.warn("Failed to create simple span: #{e.}") end |
#create_step_span(event, span_name, attributes, status) ⇒ Object
Create a step span as a child of the task span
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 |
# File 'lib/tasker/events/subscribers/telemetry_subscriber.rb', line 245 def create_step_span(event, span_name, attributes, status) return unless opentelemetry_available? task_id = safe_get(event, :task_id) step_id = safe_get(event, :step_id) return unless task_id && step_id task_span = get_task_span(task_id) return unless task_span otel_attributes = convert_attributes_for_otel(attributes) # Create child span context span_context = ::OpenTelemetry::Trace.context_with_span(task_span) ::OpenTelemetry::Context.with_current(span_context) do tracer.in_span(span_name, attributes: otel_attributes) do |step_span| # Add step event event_name = status == :error ? 'step.failed' : 'step.completed' step_span.add_event(event_name, attributes: otel_attributes) # Set span status set_span_status(step_span, status, attributes) end end rescue StandardError => e Rails.logger.warn("Failed to create step span: #{e.}") end |
#create_task_span(event, span_name, attributes) ⇒ Object
Create a root span for a task and store it for child spans
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
# File 'lib/tasker/events/subscribers/telemetry_subscriber.rb', line 192 def create_task_span(event, span_name, attributes) return unless opentelemetry_available? task_id = safe_get(event, :task_id) return unless task_id otel_attributes = convert_attributes_for_otel(attributes) span = tracer.start_root_span(span_name, attributes: otel_attributes) store_task_span(task_id, span) # Add an event to mark the task start span.add_event('task.started', attributes: otel_attributes) rescue StandardError => e Rails.logger.warn("Failed to create task span: #{e.}") end |
#create_tracer ⇒ OpenTelemetry::Tracer
Create the OpenTelemetry tracer
298 299 300 301 302 303 304 |
# File 'lib/tasker/events/subscribers/telemetry_subscriber.rb', line 298 def create_tracer config = Tasker::Configuration.configuration ::OpenTelemetry.tracer_provider.tracer( config.telemetry.service_name, config.telemetry.service_version ) end |
#event_to_annotation_name(event) ⇒ Object
Convert event to annotation name
282 283 284 285 286 |
# File 'lib/tasker/events/subscribers/telemetry_subscriber.rb', line 282 def event_to_annotation_name(event) # Simple mapping from event to annotation event_type = safe_get(event, :event_type) EVENT_ANNOTATION_MAP.fetch(event_type, 'event.processed') end |
#extract_step_attributes(event) ⇒ Object
Extract step-specific attributes (enhanced from BaseSubscriber)
158 159 160 161 162 |
# File 'lib/tasker/events/subscribers/telemetry_subscriber.rb', line 158 def extract_step_attributes(event) super.merge( step_name: safe_get(event, :step_name, 'unknown_step') ) end |
#finish_task_span(event, status, attributes) ⇒ Object
Finish a task span with the appropriate status
214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 |
# File 'lib/tasker/events/subscribers/telemetry_subscriber.rb', line 214 def finish_task_span(event, status, attributes) return unless opentelemetry_available? task_id = safe_get(event, :task_id) return unless task_id span = get_task_span(task_id) return unless span otel_attributes = convert_attributes_for_otel(attributes) # Add completion event event_name = status == :error ? 'task.failed' : 'task.completed' span.add_event(event_name, attributes: otel_attributes) # Set span status set_span_status(span, status, attributes) # Finish the span span.finish remove_task_span(task_id) rescue StandardError => e Rails.logger.warn("Failed to finish task span: #{e.}") end |
#handle_step_completed(event) ⇒ Object
Handle step completion events
106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/tasker/events/subscribers/telemetry_subscriber.rb', line 106 def handle_step_completed(event) return unless should_process_event?(Tasker::Constants::StepEvents::COMPLETED) attributes = extract_step_attributes(event).merge( execution_duration: safe_get(event, :execution_duration, 0.0), attempt_number: safe_get(event, :attempt_number, 1) ) # Create step span as child of task span create_step_span(event, 'tasker.step.execution', attributes, :ok) end |
#handle_step_execution_requested(event) ⇒ Object
Handle step execution requested events
94 95 96 97 98 99 100 101 102 103 |
# File 'lib/tasker/events/subscribers/telemetry_subscriber.rb', line 94 def handle_step_execution_requested(event) return unless should_process_event?(Tasker::Constants::StepEvents::EXECUTION_REQUESTED) attributes = extract_step_attributes(event).merge( attempt_number: safe_get(event, :attempt_number, 1) ) # Create a simple span for step queuing create_simple_span(event, 'tasker.step.queued', attributes) end |
#handle_step_failed(event) ⇒ Object
Handle step failure events
119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/tasker/events/subscribers/telemetry_subscriber.rb', line 119 def handle_step_failed(event) return unless should_process_event?(Tasker::Constants::StepEvents::FAILED) attributes = extract_step_attributes(event).merge( error: safe_get(event, :error_message, safe_get(event, :error, 'Unknown error')), attempt_number: safe_get(event, :attempt_number, 1), exception_class: safe_get(event, :exception_class, 'StandardError'), retry_limit: safe_get(event, :retry_limit, 3) ) # Create step span as child of task span with error status create_step_span(event, 'tasker.step.execution', attributes, :error) end |
#handle_step_retry_requested(event) ⇒ Object
Handle step retry events
134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/tasker/events/subscribers/telemetry_subscriber.rb', line 134 def handle_step_retry_requested(event) return unless should_process_event?(Tasker::Constants::StepEvents::RETRY_REQUESTED) attributes = extract_step_attributes(event).merge( attempt_number: safe_get(event, :attempt_number, 1), retry_limit: safe_get(event, :retry_limit, 3) ) # Create a simple span for retry events create_simple_span(event, 'tasker.step.retry', attributes) end |
#handle_task_completed(event) ⇒ Object
Handle task completion events
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/tasker/events/subscribers/telemetry_subscriber.rb', line 62 def handle_task_completed(event) return unless should_process_event?(Tasker::Constants::TaskEvents::COMPLETED) attributes = extract_core_attributes(event).merge( task_name: safe_get(event, :task_name, 'unknown_task'), duration: calculate_duration(event), total_execution_duration: safe_get(event, :total_execution_duration, 0.0), current_execution_duration: safe_get(event, :current_execution_duration, 0.0), total_steps: safe_get(event, :total_steps, 0), completed_steps: safe_get(event, :completed_steps, 0) ) # Finish the task span with success status finish_task_span(event, :ok, attributes) end |
#handle_task_failed(event) ⇒ Object
Handle task failure events
79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/tasker/events/subscribers/telemetry_subscriber.rb', line 79 def handle_task_failed(event) return unless should_process_event?(Tasker::Constants::TaskEvents::FAILED) attributes = extract_core_attributes(event).merge( task_name: safe_get(event, :task_name, 'unknown_task'), error: safe_get(event, :error_message, safe_get(event, :error, 'Unknown error')), exception_class: safe_get(event, :exception_class, 'StandardError'), failed_steps: safe_get(event, :failed_steps, 0) ) # Finish the task span with error status finish_task_span(event, :error, attributes) end |
#handle_task_initialize_requested(event) ⇒ Object
Handle task initialization events
38 39 40 41 42 43 44 45 46 47 |
# File 'lib/tasker/events/subscribers/telemetry_subscriber.rb', line 38 def handle_task_initialize_requested(event) return unless should_process_event?(Tasker::Constants::TaskEvents::INITIALIZE_REQUESTED) attributes = extract_core_attributes(event).merge( task_name: safe_get(event, :task_name, 'unknown_task') ) # Only create basic span for initialization - task.start_requested will create the main span create_simple_span(event, 'tasker.task.initialize', attributes) end |
#handle_task_start_requested(event) ⇒ Object
Handle task start events
50 51 52 53 54 55 56 57 58 59 |
# File 'lib/tasker/events/subscribers/telemetry_subscriber.rb', line 50 def handle_task_start_requested(event) return unless should_process_event?(Tasker::Constants::TaskEvents::START_REQUESTED) attributes = extract_core_attributes(event).merge( task_name: safe_get(event, :task_name, 'unknown_task') ) # Create root span for task and store it for child spans create_task_span(event, 'tasker.task.execution', attributes) end |
#opentelemetry_available? ⇒ Boolean
Check if OpenTelemetry is available and configured
291 292 293 |
# File 'lib/tasker/events/subscribers/telemetry_subscriber.rb', line 291 def opentelemetry_available? defined?(::OpenTelemetry) && ::OpenTelemetry.tracer_provider end |
#should_process_event?(event_constant) ⇒ Boolean
Override BaseSubscriber to add telemetry-specific filtering
147 148 149 150 151 152 153 154 155 |
# File 'lib/tasker/events/subscribers/telemetry_subscriber.rb', line 147 def should_process_event?(event_constant) # Get configuration once for efficiency config = Tasker::Configuration.configuration # Only process if telemetry is enabled return false unless config.telemetry.enabled super end |
#telemetry_enabled? ⇒ Boolean
Check if telemetry is enabled
165 166 167 |
# File 'lib/tasker/events/subscribers/telemetry_subscriber.rb', line 165 def telemetry_enabled? Tasker::Configuration.configuration.telemetry.enabled != false end |