Class: Tasker::Events::Subscribers::BaseSubscriber
- Inherits:
-
Object
- Object
- Tasker::Events::Subscribers::BaseSubscriber
- Defined in:
- lib/tasker/events/subscribers/base_subscriber.rb
Overview
BaseSubscriber provides a clean foundation for creating custom event subscribers
This class extracts common patterns from TelemetrySubscriber and provides:
- Declarative subscription registration via class methods
- Automatic method routing from event names to handler methods
- Defensive payload handling with safe accessors
- Easy integration with the Tasker event system
- Metrics collection helper methods for common patterns
Usage: class OrderNotificationSubscriber < Tasker::Events::Subscribers::BaseSubscriber subscribe_to :task_completed, :step_failed
def handle_task_completed(event)
OrderMailer.completion_email(event[:task_id]).deliver_later
end
def handle_step_failed(event)
AlertService.notify("Step failed: #{event[:step_name]}")
end
end
# Register the subscriber OrderNotificationSubscriber.subscribe(Tasker::Events::Publisher.instance)
Direct Known Subclasses
Defined Under Namespace
Classes: ErrorCategorizer, MetricTagsExtractor
Class Method Summary collapse
-
.filter_events(&filter_proc) ⇒ void
Set a filter for events (optional).
-
.subscribe(publisher) ⇒ BaseSubscriber
Subscribe this subscriber to a publisher.
-
.subscribe_to(*events) ⇒ void
Declarative method to register events this subscriber cares about.
Instance Method Summary collapse
-
#build_event_subscriptions ⇒ Hash
protected
Build event subscriptions using explicit constants.
-
#build_metric_name(base_name, event_type) ⇒ String
protected
Create metric name with consistent naming convention.
-
#custom_event?(event_constant) ⇒ Boolean
protected
Check if an event is a custom event (not a system constant).
-
#event_subscriptions ⇒ Hash
protected
Get event subscriptions mapping for this subscriber.
-
#extract_core_attributes(event) ⇒ Hash
protected
Extract core attributes common to most events.
-
#extract_error_metrics(event) ⇒ Hash
protected
Extract error metrics from failure events.
-
#extract_metric_tags(event) ⇒ Array<String>
protected
Extract business metrics tags for categorization.
-
#extract_numeric_metric(event, key, default = 0) ⇒ Numeric
protected
Extract numeric value safely for metrics.
-
#extract_performance_metrics(event) ⇒ Hash
protected
Extract performance metrics for operational monitoring.
-
#extract_step_attributes(event) ⇒ Hash
protected
Extract step-specific attributes (for TelemetrySubscriber compatibility).
-
#extract_timing_metrics(event) ⇒ Hash
protected
Extract timing metrics from completion events.
-
#generate_handler_method_name(event_name) ⇒ Symbol
protected
Generate handler method name from event name.
-
#initialize(name: nil, events: nil, config: {}) ⇒ BaseSubscriber
constructor
A new instance of BaseSubscriber.
-
#resolve_internal_event_constant(event_name) ⇒ String
protected
Resolve developer-friendly event name to internal constant.
-
#safe_get(event, key, default = nil) ⇒ Object
protected
Safe accessor for event payload keys with fallback values.
-
#should_handle_event?(event_constant) ⇒ Boolean
protected
Check if this subscriber should handle the given event.
-
#should_process_event?(_event_constant) ⇒ Boolean
protected
Instance-level event filtering (override in subclasses).
-
#subscribe_to_publisher(publisher) ⇒ void
Subscribe to all events defined by the class.
Constructor Details
#initialize(name: nil, events: nil, config: {}) ⇒ BaseSubscriber
Returns a new instance of BaseSubscriber.
34 35 36 37 38 39 40 41 42 43 |
# File 'lib/tasker/events/subscribers/base_subscriber.rb', line 34 def initialize(name: nil, events: nil, config: {}) @subscription_name = name @subscription_config = config # If events are provided via constructor (from YAML), add them to subscribed events return if events.blank? current_events = self.class.subscribed_events || [] self.class.subscribed_events = (current_events + Array(events)).uniq end |
Class Method Details
.filter_events(&filter_proc) ⇒ void
This method returns an undefined value.
Set a filter for events (optional)
Example: filter_events { |event_name| event_name.include?('order') }
67 68 69 |
# File 'lib/tasker/events/subscribers/base_subscriber.rb', line 67 def filter_events(&filter_proc) self.event_filter = filter_proc end |
.subscribe(publisher) ⇒ BaseSubscriber
Subscribe this subscriber to a publisher
75 76 77 78 79 |
# File 'lib/tasker/events/subscribers/base_subscriber.rb', line 75 def subscribe(publisher) subscriber = new subscriber.subscribe_to_publisher(publisher) subscriber end |
.subscribe_to(*events) ⇒ void
This method returns an undefined value.
Declarative method to register events this subscriber cares about
Example: subscribe_to :task_completed, :step_failed subscribe_to 'order.created', 'payment.processed'
54 55 56 57 58 |
# File 'lib/tasker/events/subscribers/base_subscriber.rb', line 54 def subscribe_to(*events) # Accumulate events instead of replacing them current_events = subscribed_events || [] self.subscribed_events = (current_events + events.map(&:to_s)).uniq end |
Instance Method Details
#build_event_subscriptions ⇒ Hash (protected)
Build event subscriptions using explicit constants
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/tasker/events/subscribers/base_subscriber.rb', line 121 def build_event_subscriptions subscriptions = {} (self.class.subscribed_events || []).each do |event_name| # Convert developer-friendly name to internal constant internal_constant = resolve_internal_event_constant(event_name) # Generate handler method name: order.processed -> handle_order_processed handler_method = generate_handler_method_name(event_name) # Only add if the handler method exists if respond_to?(handler_method, true) subscriptions[internal_constant] = handler_method else Rails.logger.warn("#{self.class.name}: Handler method #{handler_method} not found for event #{event_name}") end end subscriptions end |
#build_metric_name(base_name, event_type) ⇒ String (protected)
Create metric name with consistent naming convention
Example: metric_name = build_metric_name('tasker.task', 'completed') # => 'tasker.task.completed'
433 434 435 |
# File 'lib/tasker/events/subscribers/base_subscriber.rb', line 433 def build_metric_name(base_name, event_type) "#{base_name}.#{event_type}".squeeze('.') end |
#custom_event?(event_constant) ⇒ Boolean (protected)
Check if an event is a custom event (not a system constant)
103 104 105 106 |
# File 'lib/tasker/events/subscribers/base_subscriber.rb', line 103 def custom_event?(event_constant) event_str = event_constant.to_s event_str.exclude?('Tasker::Constants::') && event_str.include?('.') end |
#event_subscriptions ⇒ Hash (protected)
Get event subscriptions mapping for this subscriber
This method maps event constants to handler methods using naming conventions. Override this method to customize the mapping.
114 115 116 |
# File 'lib/tasker/events/subscribers/base_subscriber.rb', line 114 def event_subscriptions @event_subscriptions ||= build_event_subscriptions end |
#extract_core_attributes(event) ⇒ Hash (protected)
Extract core attributes common to most events
227 228 229 230 231 232 233 |
# File 'lib/tasker/events/subscribers/base_subscriber.rb', line 227 def extract_core_attributes(event) { task_id: safe_get(event, :task_id, 'unknown'), step_id: safe_get(event, :step_id), event_timestamp: safe_get(event, :timestamp, Time.current).to_s }.compact end |
#extract_error_metrics(event) ⇒ Hash (protected)
Extract error metrics from failure events
Example: error = extract_error_metrics(event) StatsD.increment('tasker.errors', tags: ["error_type:#error[:error_type]"])
279 280 281 282 283 284 285 286 287 288 |
# File 'lib/tasker/events/subscribers/base_subscriber.rb', line 279 def extract_error_metrics(event) { error_message: safe_get(event, :error_message, 'unknown_error'), error_class: safe_get(event, :exception_class, 'UnknownError'), error_type: categorize_error(safe_get(event, :exception_class)), attempt_number: safe_get(event, :attempt_number, 1).to_i, is_retryable: safe_get(event, :retryable, false), final_failure: safe_get(event, :attempt_number, 1).to_i >= safe_get(event, :retry_limit, 1).to_i } end |
#extract_metric_tags(event) ⇒ Array<String> (protected)
Extract business metrics tags for categorization
Example: tags = extract_metric_tags(event) StatsD.increment('tasker.task.completed', tags: tags)
316 317 318 |
# File 'lib/tasker/events/subscribers/base_subscriber.rb', line 316 def (event) MetricTagsExtractor.extract(event) end |
#extract_numeric_metric(event, key, default = 0) ⇒ Numeric (protected)
Extract numeric value safely for metrics
Example: duration = extract_numeric_metric(event, :execution_duration, 0.0) StatsD.histogram('task.duration', duration)
447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 |
# File 'lib/tasker/events/subscribers/base_subscriber.rb', line 447 def extract_numeric_metric(event, key, default = 0) value = safe_get(event, key, default) # Handle nil values and non-numeric types return default if value.nil? || !value.respond_to?(:to_f) # Try to convert to float converted = value.to_f # Check if this was a valid numeric conversion # For strings that can't convert, to_f returns 0.0 return default if converted == 0.0 && value.is_a?(String) && value !~ /\A\s*0*(\.0*)?\s*\z/ converted end |
#extract_performance_metrics(event) ⇒ Hash (protected)
Extract performance metrics for operational monitoring
Example: perf = extract_performance_metrics(event) StatsD.histogram('tasker.memory_usage', perf[:memory_usage])
298 299 300 301 302 303 304 305 306 |
# File 'lib/tasker/events/subscribers/base_subscriber.rb', line 298 def extract_performance_metrics(event) { memory_usage: safe_get(event, :memory_usage, 0).to_i, cpu_time: safe_get(event, :cpu_time, 0.0).to_f, queue_time: safe_get(event, :queue_time, 0.0).to_f, processing_time: safe_get(event, :processing_time, 0.0).to_f, retry_delay: safe_get(event, :retry_delay, 0.0).to_f } end |
#extract_step_attributes(event) ⇒ Hash (protected)
Extract step-specific attributes (for TelemetrySubscriber compatibility)
239 240 241 242 243 244 |
# File 'lib/tasker/events/subscribers/base_subscriber.rb', line 239 def extract_step_attributes(event) extract_core_attributes(event).merge( step_id: safe_get(event, :step_id), step_name: safe_get(event, :step_name, 'unknown_step') ).compact end |
#extract_timing_metrics(event) ⇒ Hash (protected)
Extract timing metrics from completion events
Example: timing = extract_timing_metrics(event) StatsD.histogram('tasker.task.duration', timing[:execution_duration]) StatsD.gauge('tasker.task.step_count', timing[:step_count])
260 261 262 263 264 265 266 267 268 269 |
# File 'lib/tasker/events/subscribers/base_subscriber.rb', line 260 def extract_timing_metrics(event) { execution_duration: safe_get(event, :execution_duration, 0.0).to_f, started_at: safe_get(event, :started_at), completed_at: safe_get(event, :completed_at), step_count: safe_get(event, :total_steps, 0).to_i, completed_steps: safe_get(event, :completed_steps, 0).to_i, failed_steps: safe_get(event, :failed_steps, 0).to_i } end |
#generate_handler_method_name(event_name) ⇒ Symbol (protected)
Generate handler method name from event name
172 173 174 175 176 177 178 179 180 |
# File 'lib/tasker/events/subscribers/base_subscriber.rb', line 172 def generate_handler_method_name(event_name) # Convert dots to underscores and prefix with handle_ # Examples: # 'task.completed' -> :handle_task_completed # 'step.failed' -> :handle_step_failed # 'custom.event' -> :handle_custom_event clean_name = event_name.to_s.tr('.', '_').underscore :"handle_#{clean_name}" end |
#resolve_internal_event_constant(event_name) ⇒ String (protected)
Resolve developer-friendly event name to internal constant
This handles the transparent namespace mapping:
- "order.processed" -> "custom.order.processed" (for custom events)
- "task.completed" -> "task.completed" (for system events)
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/tasker/events/subscribers/base_subscriber.rb', line 150 def resolve_internal_event_constant(event_name) event_str = event_name.to_s # Check if it's already an internal constant (starts with system prefixes) system_prefixes = %w[task. step. workflow. observability.] if system_prefixes.any? { |prefix| event_str.start_with?(prefix) } return event_str # It's a system event, use as-is end # Check if it's already prefixed with custom. if event_str.start_with?('custom.') return event_str # Already internal format end # Assume it's a custom event and add the prefix "custom.#{event_str}" end |
#safe_get(event, key, default = nil) ⇒ Object (protected)
Safe accessor for event payload keys with fallback values
208 209 210 211 212 213 214 215 216 217 218 219 220 221 |
# File 'lib/tasker/events/subscribers/base_subscriber.rb', line 208 def safe_get(event, key, default = nil) return default if event.nil? # Handle Dry::Events::Event objects if event.respond_to?(:payload) payload = event.payload return payload.fetch(key.to_sym) { payload.fetch(key.to_s, default) } end # Handle plain hash events event.fetch(key.to_sym) do event.fetch(key.to_s, default) end end |
#should_handle_event?(event_constant) ⇒ Boolean (protected)
Check if this subscriber should handle the given event
186 187 188 189 190 191 192 |
# File 'lib/tasker/events/subscribers/base_subscriber.rb', line 186 def should_handle_event?(event_constant) # Apply class-level filter if defined return false if self.class.event_filter && !self.class.event_filter.call(event_constant) # Apply instance-level filtering (override in subclasses) should_process_event?(event_constant) end |
#should_process_event?(_event_constant) ⇒ Boolean (protected)
Instance-level event filtering (override in subclasses)
198 199 200 |
# File 'lib/tasker/events/subscribers/base_subscriber.rb', line 198 def should_process_event?(_event_constant) true # Process all events by default end |
#subscribe_to_publisher(publisher) ⇒ void
This method returns an undefined value.
Subscribe to all events defined by the class
86 87 88 89 90 91 92 93 94 95 |
# File 'lib/tasker/events/subscribers/base_subscriber.rb', line 86 def subscribe_to_publisher(publisher) event_subscriptions.each do |event_constant, handler_method| # Apply filtering if defined next unless should_handle_event?(event_constant) # Subscribe to the event with automatic method routing # This will fail fast if the event doesn't exist, which is the correct behavior publisher.subscribe(event_constant, &method(handler_method)) end end |