Class: Solana::Ruby::Kit::RpcSubscriptions::Transport

Inherits:
Object
  • Object
show all
Extended by:
T::Sig
Defined in:
lib/solana/ruby/kit/rpc_subscriptions/transport.rb

Overview

WebSocket JSON-RPC transport for Solana subscription methods. Runs in a background thread; dispatches incoming messages to a DataPublisher keyed by subscription ID.

Thread safety: all public methods are Mutex-protected.

Constant Summary collapse

MESSAGE_CHANNEL =

Channel name used for raw incoming messages (before routing to sub-ID).

T.let(:__message__, Symbol)
ERROR_CHANNEL =
T.let(:error, Symbol)
CLOSE_CHANNEL =
T.let(:close, Symbol)

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(url:, headers: {}, send_buffer_high_watermark: 40) ⇒ Transport

Returns a new instance of Transport.



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/solana/ruby/kit/rpc_subscriptions/transport.rb', line 33

def initialize(url:, headers: {}, send_buffer_high_watermark: 40)
  @url        = url
  @headers    = headers
  @hwm        = send_buffer_high_watermark
  @mutex      = T.let(Mutex.new, Mutex)
  @id_seq     = T.let(0, Integer)
  @pending     = T.let({}, T::Hash[Integer, Queue])   # request id → response Queue
  @subscribers = T.let(
    Hash.new { |h, k| h[k] = [] },
    T::Hash[T.untyped, T::Array[T.proc.params(msg: T::Hash[String, T.untyped]).void]]
  )
  @send_buffer  = T.let([], T::Array[String])
  @ws           = T.let(nil, T.nilable(WebSocket::Client::Simple::Client))
  @connected    = T.let(false, T::Boolean)
  @publisher    = T.let(Subscribable::DataPublisher.new, Subscribable::DataPublisher)

  _connect
end

Instance Attribute Details

#publisherObject (readonly)

Returns the value of attribute publisher.



86
87
88
# File 'lib/solana/ruby/kit/rpc_subscriptions/transport.rb', line 86

def publisher
  @publisher
end

#urlObject (readonly)

Returns the value of attribute url.



24
25
26
# File 'lib/solana/ruby/kit/rpc_subscriptions/transport.rb', line 24

def url
  @url
end

Instance Method Details

#closeObject



89
90
91
92
# File 'lib/solana/ruby/kit/rpc_subscriptions/transport.rb', line 89

def close
  @ws&.close
  @publisher.close
end

#request(method, params = []) ⇒ Object

Raises:



57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/solana/ruby/kit/rpc_subscriptions/transport.rb', line 57

def request(method, params = [])
  id     = _next_id
  q      = Queue.new
  @mutex.synchronize { @pending[id] = q }

  payload = JSON.generate({ 'jsonrpc' => '2.0', 'id' => id, 'method' => method, 'params' => params })
  _send(payload)

  response = q.pop
  raise Rpc::RpcError.new(response['error']['message'], response['error']['code']) if response['error']

  response['result']
end

#subscribe(sub_id, &block) ⇒ Object



79
80
81
82
# File 'lib/solana/ruby/kit/rpc_subscriptions/transport.rb', line 79

def subscribe(sub_id, &block)
  @mutex.synchronize { T.must(@subscribers[sub_id]) << block }
  lambda { @mutex.synchronize { T.must(@subscribers[sub_id]).delete(block) } }
end