Class: Tasker::Events::Subscribers::BaseSubscriber

Inherits:
Object
  • Object
show all
Defined in:
lib/tasker/events/subscribers/base_subscriber.rb

Overview

BaseSubscriber provides a clean foundation for creating custom event subscribers

This class extracts common patterns from TelemetrySubscriber and provides:

  • Declarative subscription registration via class methods
  • Automatic method routing from event names to handler methods
  • Defensive payload handling with safe accessors
  • Easy integration with the Tasker event system
  • Metrics collection helper methods for common patterns

Usage: class OrderNotificationSubscriber < Tasker::Events::Subscribers::BaseSubscriber subscribe_to :task_completed, :step_failed

def handle_task_completed(event)
  OrderMailer.completion_email(event[:task_id]).deliver_later
end

def handle_step_failed(event)
  AlertService.notify("Step failed: #{event[:step_name]}")
end

end

# Register the subscriber OrderNotificationSubscriber.subscribe(Tasker::Events::Publisher.instance)

Direct Known Subclasses

MetricsSubscriber, TelemetrySubscriber

Defined Under Namespace

Classes: ErrorCategorizer, MetricTagsExtractor

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name: nil, events: nil, config: {}) ⇒ BaseSubscriber

Returns a new instance of BaseSubscriber.



34
35
36
37
38
39
40
41
42
43
# File 'lib/tasker/events/subscribers/base_subscriber.rb', line 34

def initialize(name: nil, events: nil, config: {})
  @subscription_name = name
  @subscription_config = config

  # If events are provided via constructor (from YAML), add them to subscribed events
  return if events.blank?

  current_events = self.class.subscribed_events || []
  self.class.subscribed_events = (current_events + Array(events)).uniq
end

Class Method Details

.filter_events(&filter_proc) ⇒ void

This method returns an undefined value.

Set a filter for events (optional)

Example: filter_events { |event_name| event_name.include?('order') }

Parameters:

  • filter_proc (Proc)

    A proc that returns true for events to process



67
68
69
# File 'lib/tasker/events/subscribers/base_subscriber.rb', line 67

def filter_events(&filter_proc)
  self.event_filter = filter_proc
end

.subscribe(publisher) ⇒ BaseSubscriber

Subscribe this subscriber to a publisher

Parameters:

Returns:



75
76
77
78
79
# File 'lib/tasker/events/subscribers/base_subscriber.rb', line 75

def subscribe(publisher)
  subscriber = new
  subscriber.subscribe_to_publisher(publisher)
  subscriber
end

.subscribe_to(*events) ⇒ void

This method returns an undefined value.

Declarative method to register events this subscriber cares about

Example: subscribe_to :task_completed, :step_failed subscribe_to 'order.created', 'payment.processed'

Parameters:

  • events (Array<Symbol, String>)

    Event names to subscribe to



54
55
56
57
58
# File 'lib/tasker/events/subscribers/base_subscriber.rb', line 54

def subscribe_to(*events)
  # Accumulate events instead of replacing them
  current_events = subscribed_events || []
  self.subscribed_events = (current_events + events.map(&:to_s)).uniq
end

Instance Method Details

#build_event_subscriptionsHash (protected)

Build event subscriptions using explicit constants

Returns:

  • (Hash)

    Mapping of event constants to handler method symbols



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/tasker/events/subscribers/base_subscriber.rb', line 121

def build_event_subscriptions
  subscriptions = {}

  (self.class.subscribed_events || []).each do |event_name|
    # Convert developer-friendly name to internal constant
    internal_constant = resolve_internal_event_constant(event_name)

    # Generate handler method name: order.processed -> handle_order_processed
    handler_method = generate_handler_method_name(event_name)

    # Only add if the handler method exists
    if respond_to?(handler_method, true)
      subscriptions[internal_constant] = handler_method
    else
      Rails.logger.warn("#{self.class.name}: Handler method #{handler_method} not found for event #{event_name}")
    end
  end

  subscriptions
end

#build_metric_name(base_name, event_type) ⇒ String (protected)

Create metric name with consistent naming convention

Example: metric_name = build_metric_name('tasker.task', 'completed') # => 'tasker.task.completed'

Parameters:

  • base_name (String)

    The base metric name

  • event_type (String)

    The event type (completed, failed, etc.)

Returns:

  • (String)

    Standardized metric name



433
434
435
# File 'lib/tasker/events/subscribers/base_subscriber.rb', line 433

def build_metric_name(base_name, event_type)
  "#{base_name}.#{event_type}".squeeze('.')
end

#custom_event?(event_constant) ⇒ Boolean (protected)

Check if an event is a custom event (not a system constant)

Parameters:

  • event_constant (String)

    The event constant or name

Returns:

  • (Boolean)

    Whether it's a custom event



103
104
105
106
# File 'lib/tasker/events/subscribers/base_subscriber.rb', line 103

def custom_event?(event_constant)
  event_str = event_constant.to_s
  event_str.exclude?('Tasker::Constants::') && event_str.include?('.')
end

#event_subscriptionsHash (protected)

Get event subscriptions mapping for this subscriber

This method maps event constants to handler methods using naming conventions. Override this method to customize the mapping.

Returns:

  • (Hash)

    Mapping of event constants to handler method symbols



114
115
116
# File 'lib/tasker/events/subscribers/base_subscriber.rb', line 114

def event_subscriptions
  @event_subscriptions ||= build_event_subscriptions
end

#extract_core_attributes(event) ⇒ Hash (protected)

Extract core attributes common to most events

Parameters:

  • event (Hash, Dry::Events::Event)

    The event payload or event object

Returns:

  • (Hash)

    Core attributes



227
228
229
230
231
232
233
# File 'lib/tasker/events/subscribers/base_subscriber.rb', line 227

def extract_core_attributes(event)
  {
    task_id: safe_get(event, :task_id, 'unknown'),
    step_id: safe_get(event, :step_id),
    event_timestamp: safe_get(event, :timestamp, Time.current).to_s
  }.compact
end

#extract_error_metrics(event) ⇒ Hash (protected)

Extract error metrics from failure events

Example: error = extract_error_metrics(event) StatsD.increment('tasker.errors', tags: ["error_type:#error[:error_type]"])

Parameters:

  • event (Hash, Dry::Events::Event)

    The event payload

Returns:

  • (Hash)

    Error metrics with categorization



279
280
281
282
283
284
285
286
287
288
# File 'lib/tasker/events/subscribers/base_subscriber.rb', line 279

def extract_error_metrics(event)
  {
    error_message: safe_get(event, :error_message, 'unknown_error'),
    error_class: safe_get(event, :exception_class, 'UnknownError'),
    error_type: categorize_error(safe_get(event, :exception_class)),
    attempt_number: safe_get(event, :attempt_number, 1).to_i,
    is_retryable: safe_get(event, :retryable, false),
    final_failure: safe_get(event, :attempt_number, 1).to_i >= safe_get(event, :retry_limit, 1).to_i
  }
end

#extract_metric_tags(event) ⇒ Array<String> (protected)

Extract business metrics tags for categorization

Example: tags = extract_metric_tags(event) StatsD.increment('tasker.task.completed', tags: tags)

Parameters:

  • event (Hash, Dry::Events::Event)

    The event payload

Returns:

  • (Array<String>)

    Array of tag strings for metrics systems



316
317
318
# File 'lib/tasker/events/subscribers/base_subscriber.rb', line 316

def extract_metric_tags(event)
  MetricTagsExtractor.extract(event)
end

#extract_numeric_metric(event, key, default = 0) ⇒ Numeric (protected)

Extract numeric value safely for metrics

Example: duration = extract_numeric_metric(event, :execution_duration, 0.0) StatsD.histogram('task.duration', duration)

Parameters:

  • event (Hash, Dry::Events::Event)

    The event payload

  • key (Symbol, String)

    The key to extract

  • default (Numeric) (defaults to: 0)

    The default value

Returns:

  • (Numeric)

    The numeric value



447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
# File 'lib/tasker/events/subscribers/base_subscriber.rb', line 447

def extract_numeric_metric(event, key, default = 0)
  value = safe_get(event, key, default)

  # Handle nil values and non-numeric types
  return default if value.nil? || !value.respond_to?(:to_f)

  # Try to convert to float
  converted = value.to_f

  # Check if this was a valid numeric conversion
  # For strings that can't convert, to_f returns 0.0
  return default if converted == 0.0 && value.is_a?(String) && value !~ /\A\s*0*(\.0*)?\s*\z/

  converted
end

#extract_performance_metrics(event) ⇒ Hash (protected)

Extract performance metrics for operational monitoring

Example: perf = extract_performance_metrics(event) StatsD.histogram('tasker.memory_usage', perf[:memory_usage])

Parameters:

  • event (Hash, Dry::Events::Event)

    The event payload

Returns:

  • (Hash)

    Performance metrics



298
299
300
301
302
303
304
305
306
# File 'lib/tasker/events/subscribers/base_subscriber.rb', line 298

def extract_performance_metrics(event)
  {
    memory_usage: safe_get(event, :memory_usage, 0).to_i,
    cpu_time: safe_get(event, :cpu_time, 0.0).to_f,
    queue_time: safe_get(event, :queue_time, 0.0).to_f,
    processing_time: safe_get(event, :processing_time, 0.0).to_f,
    retry_delay: safe_get(event, :retry_delay, 0.0).to_f
  }
end

#extract_step_attributes(event) ⇒ Hash (protected)

Extract step-specific attributes (for TelemetrySubscriber compatibility)

Parameters:

  • event (Hash, Dry::Events::Event)

    The event payload or event object

Returns:

  • (Hash)

    Step attributes



239
240
241
242
243
244
# File 'lib/tasker/events/subscribers/base_subscriber.rb', line 239

def extract_step_attributes(event)
  extract_core_attributes(event).merge(
    step_id: safe_get(event, :step_id),
    step_name: safe_get(event, :step_name, 'unknown_step')
  ).compact
end

#extract_timing_metrics(event) ⇒ Hash (protected)

Extract timing metrics from completion events

Example: timing = extract_timing_metrics(event) StatsD.histogram('tasker.task.duration', timing[:execution_duration]) StatsD.gauge('tasker.task.step_count', timing[:step_count])

Parameters:

  • event (Hash, Dry::Events::Event)

    The event payload

Returns:

  • (Hash)

    Timing metrics with default values



260
261
262
263
264
265
266
267
268
269
# File 'lib/tasker/events/subscribers/base_subscriber.rb', line 260

def extract_timing_metrics(event)
  {
    execution_duration: safe_get(event, :execution_duration, 0.0).to_f,
    started_at: safe_get(event, :started_at),
    completed_at: safe_get(event, :completed_at),
    step_count: safe_get(event, :total_steps, 0).to_i,
    completed_steps: safe_get(event, :completed_steps, 0).to_i,
    failed_steps: safe_get(event, :failed_steps, 0).to_i
  }
end

#generate_handler_method_name(event_name) ⇒ Symbol (protected)

Generate handler method name from event name

Parameters:

  • event_name (String)

    The event name (should be consistent format)

Returns:

  • (Symbol)

    The handler method name



172
173
174
175
176
177
178
179
180
# File 'lib/tasker/events/subscribers/base_subscriber.rb', line 172

def generate_handler_method_name(event_name)
  # Convert dots to underscores and prefix with handle_
  # Examples:
  #   'task.completed' -> :handle_task_completed
  #   'step.failed' -> :handle_step_failed
  #   'custom.event' -> :handle_custom_event
  clean_name = event_name.to_s.tr('.', '_').underscore
  :"handle_#{clean_name}"
end

#resolve_internal_event_constant(event_name) ⇒ String (protected)

Resolve developer-friendly event name to internal constant

This handles the transparent namespace mapping:

  • "order.processed" -> "custom.order.processed" (for custom events)
  • "task.completed" -> "task.completed" (for system events)

Parameters:

  • event_name (String)

    The developer-friendly event name

Returns:

  • (String)

    The internal event constant



150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/tasker/events/subscribers/base_subscriber.rb', line 150

def resolve_internal_event_constant(event_name)
  event_str = event_name.to_s

  # Check if it's already an internal constant (starts with system prefixes)
  system_prefixes = %w[task. step. workflow. observability.]
  if system_prefixes.any? { |prefix| event_str.start_with?(prefix) }
    return event_str # It's a system event, use as-is
  end

  # Check if it's already prefixed with custom.
  if event_str.start_with?('custom.')
    return event_str # Already internal format
  end

  # Assume it's a custom event and add the prefix
  "custom.#{event_str}"
end

#safe_get(event, key, default = nil) ⇒ Object (protected)

Safe accessor for event payload keys with fallback values

Parameters:

  • event (Hash, Dry::Events::Event)

    The event payload or event object

  • key (Symbol, String)

    The key to access

  • default (Object) (defaults to: nil)

    The default value if key is missing

Returns:

  • (Object)

    The value or default



208
209
210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/tasker/events/subscribers/base_subscriber.rb', line 208

def safe_get(event, key, default = nil)
  return default if event.nil?

  # Handle Dry::Events::Event objects
  if event.respond_to?(:payload)
    payload = event.payload
    return payload.fetch(key.to_sym) { payload.fetch(key.to_s, default) }
  end

  # Handle plain hash events
  event.fetch(key.to_sym) do
    event.fetch(key.to_s, default)
  end
end

#should_handle_event?(event_constant) ⇒ Boolean (protected)

Check if this subscriber should handle the given event

Parameters:

  • event_constant (String)

    The event constant

Returns:

  • (Boolean)

    Whether to handle this event



186
187
188
189
190
191
192
# File 'lib/tasker/events/subscribers/base_subscriber.rb', line 186

def should_handle_event?(event_constant)
  # Apply class-level filter if defined
  return false if self.class.event_filter && !self.class.event_filter.call(event_constant)

  # Apply instance-level filtering (override in subclasses)
  should_process_event?(event_constant)
end

#should_process_event?(_event_constant) ⇒ Boolean (protected)

Instance-level event filtering (override in subclasses)

Parameters:

  • _event_constant (String)

    The event constant (unused in base implementation)

Returns:

  • (Boolean)

    Whether to handle this event



198
199
200
# File 'lib/tasker/events/subscribers/base_subscriber.rb', line 198

def should_process_event?(_event_constant)
  true # Process all events by default
end

#subscribe_to_publisher(publisher) ⇒ void

This method returns an undefined value.

Subscribe to all events defined by the class

Parameters:



86
87
88
89
90
91
92
93
94
95
# File 'lib/tasker/events/subscribers/base_subscriber.rb', line 86

def subscribe_to_publisher(publisher)
  event_subscriptions.each do |event_constant, handler_method|
    # Apply filtering if defined
    next unless should_handle_event?(event_constant)

    # Subscribe to the event with automatic method routing
    # This will fail fast if the event doesn't exist, which is the correct behavior
    publisher.subscribe(event_constant, &method(handler_method))
  end
end