Module: Solana::Ruby::Kit::Subscribable::AsyncIterable

Extended by:
T::Sig
Defined in:
lib/solana/ruby/kit/subscribable/async_iterable.rb

Overview

Creates an Enumerator backed by a thread-safe Queue.

Mirrors createAsyncIterableFromDataPublisher from @solana/subscribable.

Usage:

enum = AsyncIterable.from_publisher(publisher, data_channel: :accountNotification)
enum.each { |notification| process(notification) }

The enumerator blocks on Queue#pop until the publisher closes or the optional timeout expires. When the publisher emits on the error channel the exception is re-raised inside the enumerator.

Constant Summary collapse

DONE =

Sentinel value pushed into the queue when the stream is done.

T.let(Object.new.freeze, Object)

Class Method Summary collapse

Class Method Details

.from_publisher(publisher, data_channel:, error_channel: :error, timeout: nil) ⇒ Object



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/solana/ruby/kit/subscribable/async_iterable.rb', line 38

def from_publisher(publisher, data_channel:, error_channel: :error, timeout: nil)
  queue = T.let(Queue.new, Queue)

  # Subscribe to data
  unsub_data = publisher.on(data_channel) { |data| queue.push([:data, data]) }

  # Subscribe to errors — re-raise inside the enumerator
  unsub_error = publisher.on(error_channel) { |err| queue.push([:error, err]) }

  # Subscribe to close
  unsub_close = publisher.on(DataPublisher::CLOSE_CHANNEL) { queue.push([:done, nil]) }

  cleanup = Kernel.lambda do
    unsub_data.call
    unsub_error.call
    unsub_close.call
  end

  Enumerator.new do |yielder|
    Kernel.loop do
      kind, payload = if timeout
                        begin
                          Timeout.timeout(timeout) { queue.pop }
                        rescue Timeout::Error
                          [:done, nil]
                        end
                      else
                        queue.pop
                      end

      case kind
      when :data  then yielder.yield(payload)
      when :error then Kernel.raise T.cast(payload, StandardError)
      else             break
      end
    end
  ensure
    cleanup.call
  end
end