Class: EventStoreRuby::MemoryEventStreamNotifier

Inherits:
Object
  • Object
show all
Defined in:
lib/eventstore_ruby/memory_event_stream_notifier.rb

Overview

In-memory notifier broadcasting EventRecord arrays to subscribers.

Instance Method Summary collapse

Constructor Details

#initializeMemoryEventStreamNotifier

Returns a new instance of MemoryEventStreamNotifier.



6
7
8
9
10
# File 'lib/eventstore_ruby/memory_event_stream_notifier.rb', line 6

def initialize
  @subscriptions = {}
  @counter = 0
  @mutex = Mutex.new
end

Instance Method Details

#closeObject



51
52
53
# File 'lib/eventstore_ruby/memory_event_stream_notifier.rb', line 51

def close
  @mutex.synchronize { @subscriptions.clear }
end

#notify(events) ⇒ Object

Notify subscribers concurrently; errors are rescued and printed.



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/eventstore_ruby/memory_event_stream_notifier.rb', line 32

def notify(events)
  return if events.empty?
  subs = nil
  @mutex.synchronize { subs = @subscriptions.values.dup }

  # run each handler in a future so notification order is preserved per subscription but not blocked by others
  futures = subs.map do |handle|
    Concurrent::Future.execute do
      begin
        handle.call(events)
      rescue StandardError => e
        Kernel.warn "notifiers-memory-err01: Error notifying subscriber: #{e.message}\n#{e.backtrace.join("\n")}"
      end
    end
  end

  futures.each(&:wait) # Equivalent to Promise.allSettled
end

#subscribe(handle) ⇒ Object

Subscribe with a handler Proc accepting events Array<EventRecord>. Returns EventSubscription.

Raises:

  • (ArgumentError)


14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/eventstore_ruby/memory_event_stream_notifier.rb', line 14

def subscribe(handle)
  raise ArgumentError, 'handle must respond_to :call' unless handle.respond_to?(:call)

  id = nil
  unsubscribe_proc = nil

  @mutex.synchronize do
    id = "notifier-sub-#{@counter += 1}"
    @subscriptions[id] = handle
    unsubscribe_proc = -> {
      @mutex.synchronize { @subscriptions.delete(id) }
    }
  end

  EventSubscription.new(id, unsubscribe_proc)
end