Class: DSPy::EventRegistry
- Inherits:
-
Object
- Object
- DSPy::EventRegistry
- Defined in:
- lib/dspy/events.rb
Instance Method Summary collapse
- #clear_listeners ⇒ Object
-
#initialize ⇒ EventRegistry
constructor
A new instance of EventRegistry.
- #notify(event_name, attributes) ⇒ Object
- #subscribe(pattern, &block) ⇒ Object
- #unsubscribe(subscription_id) ⇒ Object
Constructor Details
#initialize ⇒ EventRegistry
Returns a new instance of EventRegistry.
12 13 14 15 16 |
# File 'lib/dspy/events.rb', line 12 def initialize @listeners = {} @subscription_counter = 0 @mutex = Mutex.new end |
Instance Method Details
#clear_listeners ⇒ Object
38 39 40 41 42 |
# File 'lib/dspy/events.rb', line 38 def clear_listeners @mutex.synchronize do @listeners.clear end end |
#notify(event_name, attributes) ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/dspy/events.rb', line 44 def notify(event_name, attributes) # Take a snapshot of current listeners to avoid holding the mutex during execution # This allows listeners to be modified while others are executing matching_listeners = @mutex.synchronize do @listeners.select do |id, listener| pattern_matches?(listener[:pattern], event_name) end.dup # Create a copy to avoid shared state end matching_listeners.each do |id, listener| begin listener[:block].call(event_name, attributes) rescue => e # Log the error but continue processing other listeners # Use emit_log directly to avoid infinite recursion DSPy.send(:emit_log, 'event.listener.error', { subscription_id: id, error_class: e.class.name, error_message: e., event_name: event_name }) end end end |
#subscribe(pattern, &block) ⇒ Object
18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/dspy/events.rb', line 18 def subscribe(pattern, &block) return unless block_given? subscription_id = SecureRandom.uuid @mutex.synchronize do @listeners[subscription_id] = { pattern: pattern, block: block } end subscription_id end |
#unsubscribe(subscription_id) ⇒ Object
32 33 34 35 36 |
# File 'lib/dspy/events.rb', line 32 def unsubscribe(subscription_id) @mutex.synchronize do @listeners.delete(subscription_id) end end |