Class: Solana::Ruby::Kit::RpcSubscriptions::Transport
- Inherits:
-
Object
- Object
- Solana::Ruby::Kit::RpcSubscriptions::Transport
- Extended by:
- T::Sig
- Defined in:
- lib/solana/ruby/kit/rpc_subscriptions/transport.rb
Overview
WebSocket JSON-RPC transport for Solana subscription methods. Runs in a background thread; dispatches incoming messages to a DataPublisher keyed by subscription ID.
Thread safety: all public methods are Mutex-protected.
Constant Summary collapse
- MESSAGE_CHANNEL =
Channel name used for raw incoming messages (before routing to sub-ID).
T.let(:__message__, Symbol)
- ERROR_CHANNEL =
T.let(:error, Symbol)
- CLOSE_CHANNEL =
T.let(:close, Symbol)
Instance Attribute Summary collapse
-
#publisher ⇒ Object
readonly
Returns the value of attribute publisher.
-
#url ⇒ Object
readonly
Returns the value of attribute url.
Instance Method Summary collapse
- #close ⇒ Object
-
#initialize(url:, headers: {}, send_buffer_high_watermark: 40) ⇒ Transport
constructor
A new instance of Transport.
- #request(method, params = []) ⇒ Object
- #subscribe(sub_id, &block) ⇒ Object
Constructor Details
#initialize(url:, headers: {}, send_buffer_high_watermark: 40) ⇒ Transport
Returns a new instance of Transport.
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/solana/ruby/kit/rpc_subscriptions/transport.rb', line 33 def initialize(url:, headers: {}, send_buffer_high_watermark: 40) @url = url @headers = headers @hwm = send_buffer_high_watermark @mutex = T.let(Mutex.new, Mutex) @id_seq = T.let(0, Integer) @pending = T.let({}, T::Hash[Integer, Queue]) # request id → response Queue @subscribers = T.let( Hash.new { |h, k| h[k] = [] }, T::Hash[T.untyped, T::Array[T.proc.params(msg: T::Hash[String, T.untyped]).void]] ) @send_buffer = T.let([], T::Array[String]) @ws = T.let(nil, T.nilable(WebSocket::Client::Simple::Client)) @connected = T.let(false, T::Boolean) @publisher = T.let(Subscribable::DataPublisher.new, Subscribable::DataPublisher) _connect end |
Instance Attribute Details
#publisher ⇒ Object (readonly)
Returns the value of attribute publisher.
86 87 88 |
# File 'lib/solana/ruby/kit/rpc_subscriptions/transport.rb', line 86 def publisher @publisher end |
#url ⇒ Object (readonly)
Returns the value of attribute url.
24 25 26 |
# File 'lib/solana/ruby/kit/rpc_subscriptions/transport.rb', line 24 def url @url end |
Instance Method Details
#close ⇒ Object
89 90 91 92 |
# File 'lib/solana/ruby/kit/rpc_subscriptions/transport.rb', line 89 def close @ws&.close @publisher.close end |
#request(method, params = []) ⇒ Object
57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/solana/ruby/kit/rpc_subscriptions/transport.rb', line 57 def request(method, params = []) id = _next_id q = Queue.new @mutex.synchronize { @pending[id] = q } payload = JSON.generate({ 'jsonrpc' => '2.0', 'id' => id, 'method' => method, 'params' => params }) _send(payload) response = q.pop raise Rpc::RpcError.new(response['error']['message'], response['error']['code']) if response['error'] response['result'] end |
#subscribe(sub_id, &block) ⇒ Object
79 80 81 82 |
# File 'lib/solana/ruby/kit/rpc_subscriptions/transport.rb', line 79 def subscribe(sub_id, &block) @mutex.synchronize { T.must(@subscribers[sub_id]) << block } lambda { @mutex.synchronize { T.must(@subscribers[sub_id]).delete(block) } } end |