Class: EventStoreRuby::MemoryEventStreamNotifier
- Inherits:
-
Object
- Object
- EventStoreRuby::MemoryEventStreamNotifier
- Defined in:
- lib/eventstore_ruby/memory_event_stream_notifier.rb
Overview
In-memory notifier broadcasting EventRecord arrays to subscribers.
Instance Method Summary collapse
- #close ⇒ Object
-
#initialize ⇒ MemoryEventStreamNotifier
constructor
A new instance of MemoryEventStreamNotifier.
-
#notify(events) ⇒ Object
Notify subscribers concurrently; errors are rescued and printed.
-
#subscribe(handle) ⇒ Object
Subscribe with a handler Proc accepting events Array<EventRecord>.
Constructor Details
#initialize ⇒ MemoryEventStreamNotifier
Returns a new instance of MemoryEventStreamNotifier.
6 7 8 9 10 |
# File 'lib/eventstore_ruby/memory_event_stream_notifier.rb', line 6 def initialize @subscriptions = {} @counter = 0 @mutex = Mutex.new end |
Instance Method Details
#close ⇒ Object
51 52 53 |
# File 'lib/eventstore_ruby/memory_event_stream_notifier.rb', line 51 def close @mutex.synchronize { @subscriptions.clear } end |
#notify(events) ⇒ Object
Notify subscribers concurrently; errors are rescued and printed.
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/eventstore_ruby/memory_event_stream_notifier.rb', line 32 def notify(events) return if events.empty? subs = nil @mutex.synchronize { subs = @subscriptions.values.dup } # run each handler in a future so notification order is preserved per subscription but not blocked by others futures = subs.map do |handle| Concurrent::Future.execute do begin handle.call(events) rescue StandardError => e Kernel.warn "notifiers-memory-err01: Error notifying subscriber: #{e.}\n#{e.backtrace.join("\n")}" end end end futures.each(&:wait) # Equivalent to Promise.allSettled end |
#subscribe(handle) ⇒ Object
Subscribe with a handler Proc accepting events Array<EventRecord>. Returns EventSubscription.
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/eventstore_ruby/memory_event_stream_notifier.rb', line 14 def subscribe(handle) raise ArgumentError, 'handle must respond_to :call' unless handle.respond_to?(:call) id = nil unsubscribe_proc = nil @mutex.synchronize do id = "notifier-sub-#{@counter += 1}" @subscriptions[id] = handle unsubscribe_proc = -> { @mutex.synchronize { @subscriptions.delete(id) } } end EventSubscription.new(id, unsubscribe_proc) end |