Class: DSPy::EventRegistry

Inherits:
Object
  • Object
show all
Defined in:
lib/dspy/events.rb

Instance Method Summary collapse

Constructor Details

#initializeEventRegistry

Returns a new instance of EventRegistry.



12
13
14
15
16
# File 'lib/dspy/events.rb', line 12

def initialize
  @listeners = {}
  @subscription_counter = 0
  @mutex = Mutex.new
end

Instance Method Details

#clear_listenersObject



38
39
40
41
42
# File 'lib/dspy/events.rb', line 38

def clear_listeners
  @mutex.synchronize do
    @listeners.clear
  end
end

#notify(event_name, attributes) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/dspy/events.rb', line 44

def notify(event_name, attributes)
  # Take a snapshot of current listeners to avoid holding the mutex during execution
  # This allows listeners to be modified while others are executing
  matching_listeners = @mutex.synchronize do
    @listeners.select do |id, listener|
      pattern_matches?(listener[:pattern], event_name)
    end.dup  # Create a copy to avoid shared state
  end

  matching_listeners.each do |id, listener|
    begin
      listener[:block].call(event_name, attributes)
    rescue => e
      # Log the error but continue processing other listeners
      # Use emit_log directly to avoid infinite recursion
      DSPy.send(:emit_log, 'event.listener.error', {
        subscription_id: id,
        error_class: e.class.name,
        error_message: e.message,
        event_name: event_name
      })
    end
  end
end

#subscribe(pattern, &block) ⇒ Object



18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/dspy/events.rb', line 18

def subscribe(pattern, &block)
  return unless block_given?
  
  subscription_id = SecureRandom.uuid
  @mutex.synchronize do
    @listeners[subscription_id] = {
      pattern: pattern,
      block: block
    }
  end
  
  subscription_id
end

#unsubscribe(subscription_id) ⇒ Object



32
33
34
35
36
# File 'lib/dspy/events.rb', line 32

def unsubscribe(subscription_id)
  @mutex.synchronize do
    @listeners.delete(subscription_id)
  end
end