Class: Solana::Ruby::Kit::Subscribable::DataPublisher

Inherits:
Object
  • Object
show all
Extended by:
T::Sig
Defined in:
lib/solana/ruby/kit/subscribable/data_publisher.rb

Overview

Thread-safe pub/sub hub — mirrors @solana/subscribable DataPublisher.

Channels are arbitrary symbols/strings. Subscribers are blocks called synchronously from #publish (in the calling thread). An optional signal lambda is checked before each dispatch; if it raises the subscriber is automatically removed.

Returns an unsubscribe lambda from #on.

Constant Summary collapse

ERROR_CHANNEL =
T.let(:error, Symbol)
CLOSE_CHANNEL =
T.let(:close, Symbol)

Instance Method Summary collapse

Constructor Details

#initializeDataPublisher

Returns a new instance of DataPublisher.



21
22
23
24
25
26
27
28
29
# File 'lib/solana/ruby/kit/subscribable/data_publisher.rb', line 21

def initialize
  # channel_name → [[subscriber_proc, signal_lambda_or_nil], ...]
  @subscribers = T.let(
    Hash.new { |h, k| h[k] = [] },
    T::Hash[T.untyped, T::Array[[T.proc.params(data: T.untyped).void, T.nilable(T.proc.void)]]]
  )
  @mutex  = T.let(Mutex.new, Mutex)
  @closed = T.let(false, T::Boolean)
end

Instance Method Details

#closeObject



78
79
80
81
82
83
84
# File 'lib/solana/ruby/kit/subscribable/data_publisher.rb', line 78

def close
  publish(CLOSE_CHANNEL, nil)
  @mutex.synchronize do
    @subscribers.clear
    @closed = true
  end
end

#closed?Boolean

Returns:

  • (Boolean)


87
# File 'lib/solana/ruby/kit/subscribable/data_publisher.rb', line 87

def closed? = @closed

#on(channel_name, signal: nil, &block) ⇒ Object



43
44
45
46
47
48
49
50
51
# File 'lib/solana/ruby/kit/subscribable/data_publisher.rb', line 43

def on(channel_name, signal: nil, &block)
  entry = [block, signal]
  @mutex.synchronize { T.must(@subscribers[channel_name]) << entry }

  # Return unsubscribe lambda
  lambda do
    @mutex.synchronize { T.must(@subscribers[channel_name]).delete(entry) }
  end
end

#publish(channel_name, data) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/solana/ruby/kit/subscribable/data_publisher.rb', line 56

def publish(channel_name, data)
  entries = @mutex.synchronize { (@subscribers[channel_name] || []).dup }
  entries.each do |subscriber, signal|
    begin
      signal&.call
    rescue StandardError
      # Signal fired — remove this subscriber and skip dispatch
      @mutex.synchronize { T.must(@subscribers[channel_name]).delete([subscriber, signal]) }
      next
    end
    begin
      subscriber.call(data)
    rescue StandardError => e
      # Dispatch errors are re-published on the error channel (not raised)
      publish(ERROR_CHANNEL, e) unless channel_name == ERROR_CHANNEL
    end
  end
end