Module: Solana::Ruby::Kit::Subscribable::AsyncIterable
- Extended by:
- T::Sig
- Defined in:
- lib/solana/ruby/kit/subscribable/async_iterable.rb
Overview
Creates an Enumerator backed by a thread-safe Queue.
Mirrors createAsyncIterableFromDataPublisher from @solana/subscribable.
Usage:
enum = AsyncIterable.from_publisher(publisher, data_channel: :accountNotification)
enum.each { |notification| process(notification) }
The enumerator blocks on Queue#pop until the publisher closes or the optional timeout expires. When the publisher emits on the error channel the exception is re-raised inside the enumerator.
Constant Summary collapse
- DONE =
Sentinel value pushed into the queue when the stream is done.
T.let(Object.new.freeze, Object)
Class Method Summary collapse
Class Method Details
.from_publisher(publisher, data_channel:, error_channel: :error, timeout: nil) ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/solana/ruby/kit/subscribable/async_iterable.rb', line 38 def from_publisher(publisher, data_channel:, error_channel: :error, timeout: nil) queue = T.let(Queue.new, Queue) # Subscribe to data unsub_data = publisher.on(data_channel) { |data| queue.push([:data, data]) } # Subscribe to errors — re-raise inside the enumerator unsub_error = publisher.on(error_channel) { |err| queue.push([:error, err]) } # Subscribe to close unsub_close = publisher.on(DataPublisher::CLOSE_CHANNEL) { queue.push([:done, nil]) } cleanup = Kernel.lambda do unsub_data.call unsub_error.call unsub_close.call end Enumerator.new do |yielder| Kernel.loop do kind, payload = if timeout begin Timeout.timeout(timeout) { queue.pop } rescue Timeout::Error [:done, nil] end else queue.pop end case kind when :data then yielder.yield(payload) when :error then Kernel.raise T.cast(payload, StandardError) else break end end ensure cleanup.call end end |