Class: Tasker::Registry::SubscriberRegistry

Inherits:
BaseRegistry show all
Includes:
Singleton
Defined in:
lib/tasker/registry/subscriber_registry.rb

Overview

Registry for managing event subscribers

Provides centralized management of event subscribers with discovery, registration, and event-based lookup capabilities. Integrates with the existing event system to provide comprehensive subscriber coordination.

Examples:

Register a subscriber

registry = Tasker::Registry::SubscriberRegistry.instance
registry.register(MySubscriber, events: ['task.created', 'task.completed'])

Find subscribers for an event

subscribers = registry.subscribers_for_event('task.created')

Auto-discover subscribers

registry.auto_discover_subscribers

Constant Summary

Constants included from Concerns::StructuredLogging

Concerns::StructuredLogging::CORRELATION_ID_KEY

Instance Method Summary collapse

Methods inherited from BaseRegistry

#base_stats, #health_check, #healthy?, #log_registration, #log_registry_error, #log_registry_operation, #log_unregistration, #log_validation_failure, #thread_safe_operation, #validate_registration_params!

Methods included from Concerns::StructuredLogging

#correlation_id, #correlation_id=, #log_exception, #log_orchestration_event, #log_performance_event, #log_step_event, #log_structured, #log_task_event, #with_correlation_id

Constructor Details

#initializeSubscriberRegistry

Returns a new instance of SubscriberRegistry.



28
29
30
31
32
33
# File 'lib/tasker/registry/subscriber_registry.rb', line 28

def initialize
  super
  @subscribers = Concurrent::Hash.new
  @event_mappings = Concurrent::Hash.new
  log_registry_operation('initialized', total_subscribers: 0, total_events: 0)
end

Instance Method Details

#all_itemsHash

Get all registered subscribers (required by BaseRegistry)

Returns:

  • (Hash)

    All registered subscribers



132
133
134
# File 'lib/tasker/registry/subscriber_registry.rb', line 132

def all_items
  all_subscribers
end

#all_subscribersHash

Get all registered subscribers

Returns:

  • (Hash)

    All registered subscribers



125
126
127
# File 'lib/tasker/registry/subscriber_registry.rb', line 125

def all_subscribers
  @subscribers.dup
end

#auto_discover_subscribers(directory = nil) ⇒ Integer

Auto-discover subscribers in the subscribers directory

Parameters:

  • directory (String) (defaults to: nil)

    Directory to search for subscribers

Returns:

  • (Integer)

    Number of subscribers discovered



155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/tasker/registry/subscriber_registry.rb', line 155

def auto_discover_subscribers(directory = nil)
  directory ||= File.join(File.dirname(__FILE__), '..', 'events', 'subscribers')

  return 0 unless Dir.exist?(directory)

  discovered_count = 0

  Dir.glob(File.join(directory, '*_subscriber.rb')).each do |file|
    require file

    # Extract class name from filename
    class_name = File.basename(file, '.rb').camelize
    full_class_name = "Tasker::Events::Subscribers::#{class_name}"

    begin
      # Try to instantiate the subscriber
      subscriber_class = full_class_name.constantize

      # Auto-discover events from class methods or constants
      events = discover_subscriber_events(subscriber_class)

      register(subscriber_class, events: events, auto_discovered: true)
      discovered_count += 1

      log_registry_operation('auto_discovered_subscriber',
                             subscriber_name: subscriber_class.name,
                             events: events,
                             file_path: file)
    rescue StandardError => e
      log_registry_error('auto_discovery_failed', e,
                         file_path: file,
                         class_name: full_class_name)
    end
  end

  discovered_count
end

#clear!Boolean

Clear all registered subscribers (required by BaseRegistry)

Returns:

  • (Boolean)

    True if cleared successfully



210
211
212
213
214
215
216
217
# File 'lib/tasker/registry/subscriber_registry.rb', line 210

def clear!
  thread_safe_operation do
    @subscribers.clear
    @event_mappings.clear
    log_registry_operation('cleared_all')
    true
  end
end

#has_subscribers?(event_name) ⇒ Boolean

Check if an event has any subscribers

Parameters:

  • event_name (String)

    Event name to check

Returns:

  • (Boolean)

    True if event has subscribers



147
148
149
# File 'lib/tasker/registry/subscriber_registry.rb', line 147

def has_subscribers?(event_name)
  @event_mappings.key?(event_name.to_s) && !@event_mappings[event_name.to_s].empty?
end

#register(subscriber_class, events: [], replace: false, **options) ⇒ Boolean

Register an event subscriber

Parameters:

  • subscriber_class (Class)

    The subscriber class to register

  • events (Array<String>) (defaults to: [])

    Events this subscriber handles

  • replace (Boolean) (defaults to: false)

    Whether to replace existing subscriber

  • options (Hash)

    Additional registration options

Returns:

  • (Boolean)

    True if registration successful

Raises:

  • (ArgumentError)

    If subscriber already exists and replace is false



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/tasker/registry/subscriber_registry.rb', line 43

def register(subscriber_class, events: [], replace: false, **options)
  subscriber_name = subscriber_class.name.demodulize.underscore

  thread_safe_operation do
    # Validate subscriber interface
    begin
      Registry::InterfaceValidator.validate_subscriber!(subscriber_class)
    rescue ArgumentError => e
      log_validation_failure('event_subscriber', subscriber_name, e.message)
      raise
    end

    # Check for existing subscriber
    if @subscribers.key?(subscriber_name) && !replace
      raise ArgumentError, "Subscriber '#{subscriber_name}' already registered. Use replace: true to override."
    end

    # Remove existing subscriber if replacing
    if @subscribers.key?(subscriber_name)
      existing_config = @subscribers[subscriber_name]
      unregister_event_mappings(subscriber_name, existing_config[:events])
      log_unregistration('event_subscriber', subscriber_name, existing_config[:subscriber_class])
    end

    # Register subscriber
    subscriber_config = {
      subscriber_class: subscriber_class,
      events: events,
      registered_at: Time.current,
      options: options.merge(replace: replace)
    }

    @subscribers[subscriber_name] = subscriber_config

    # Index by events
    register_event_mappings(subscriber_name, events)

    log_registration('event_subscriber', subscriber_name, subscriber_class,
                     { events: events, event_count: events.size, **options })

    true
  end
end

#statsHash

Get comprehensive registry statistics

Returns:

  • (Hash)

    Detailed statistics about the registry



196
197
198
199
200
201
202
203
204
205
# File 'lib/tasker/registry/subscriber_registry.rb', line 196

def stats
  base_stats.merge(
    total_subscribers: @subscribers.size,
    total_events: @event_mappings.size,
    events_covered: @event_mappings.keys.sort,
    subscribers_by_event: @event_mappings.transform_values(&:size),
    average_events_per_subscriber: calculate_average_events_per_subscriber,
    most_popular_events: find_most_popular_events
  )
end

#subscribers_for_event(event_name) ⇒ Array<Hash>

Get subscribers for a specific event

Parameters:

  • event_name (String)

    The event name to find subscribers for

Returns:

  • (Array<Hash>)

    Array of subscriber configurations



117
118
119
120
# File 'lib/tasker/registry/subscriber_registry.rb', line 117

def subscribers_for_event(event_name)
  subscriber_names = @event_mappings[event_name.to_s] || []
  subscriber_names.filter_map { |name| @subscribers[name] }
end

#supported_eventsArray<String>

Get supported events across all subscribers

Returns:

  • (Array<String>)

    Array of supported event names



139
140
141
# File 'lib/tasker/registry/subscriber_registry.rb', line 139

def supported_events
  @event_mappings.keys.sort
end

#unregister(subscriber_class) ⇒ Boolean

Unregister an event subscriber

Parameters:

  • subscriber_class (Class, String)

    The subscriber class or name to unregister

Returns:

  • (Boolean)

    True if unregistered successfully



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/tasker/registry/subscriber_registry.rb', line 91

def unregister(subscriber_class)
  subscriber_name = if subscriber_class.is_a?(Class)
                      subscriber_class.name.demodulize.underscore
                    else
                      subscriber_class.to_s
                    end

  thread_safe_operation do
    subscriber_config = @subscribers.delete(subscriber_name)

    if subscriber_config
      # Remove from event mappings
      unregister_event_mappings(subscriber_name, subscriber_config[:events])

      log_unregistration('event_subscriber', subscriber_name, subscriber_config[:subscriber_class])
      true
    else
      false
    end
  end
end