Class: RJR::Nodes::WS
Overview
Web socket node definition, listen for and invoke json-rpc requests via web sockets
Clients should specify the hostname / port when listening for and invoking requests.
note the RJR javascript client also supports sending / receiving json-rpc messages over web sockets
Constant Summary collapse
- RJR_NODE_TYPE =
:ws
Instance Attribute Summary
Attributes inherited from RJR::Node
#dispatcher, #message_headers, #node_id
Instance Method Summary collapse
-
#initialize(args = {}) ⇒ WS
constructor
WS initializer.
-
#invoke(uri, rpc_method, *args) ⇒ Object
Instructs node to send rpc request, and wait for / return response.
-
#listen ⇒ Object
Instruct Node to start listening for and dispatching rpc requests.
-
#notify(uri, rpc_method, *args) ⇒ Object
Instructs node to send rpc notification (immadiately returns / no response is generated).
-
#send_msg(data, ws) ⇒ Object
Send data using specified websocket safely.
- #to_s ⇒ Object
Methods inherited from RJR::Node
em, #halt, #join, #node_type, #on, tp
Constructor Details
#initialize(args = {}) ⇒ WS
WS initializer
87 88 89 90 91 92 93 94 |
# File 'lib/rjr/nodes/ws.rb', line 87 def initialize(args = {}) super(args) @host = args[:host] @port = args[:port] @connections = [] @connections_lock = Mutex.new end |
Instance Method Details
#invoke(uri, rpc_method, *args) ⇒ Object
Instructs node to send rpc request, and wait for / return response
Implementation of RJR::Node#invoke
Do not invoke directly from em event loop or callback as will block the message subscription used to receive responses
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/rjr/nodes/ws.rb', line 133 def invoke(uri, rpc_method, *args) = RequestMessage.new :method => rpc_method, :args => args, :headers => @message_headers @@em.schedule { init_client(uri) do |c| c.stream { |msg| (msg, c) } c.send_msg .to_s end } # TODO optional timeout for response ? result = wait_for_result() if result.size > 2 raise Exception, result[2] end return result[1] end |
#listen ⇒ Object
Instruct Node to start listening for and dispatching rpc requests
Implementation of RJR::Node#listen
110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/rjr/nodes/ws.rb', line 110 def listen @@em.schedule do EventMachine::WebSocket.run(:host => @host, :port => @port) do |ws| ws.onopen { } ws.onclose { @connection_event_handlers[:closed].each { |h| h.call self } } ws.onerror { |e| @connection_event_handlers[:error].each { |h| h.call self } } ws. { |msg| (msg, ws) } end end self end |
#notify(uri, rpc_method, *args) ⇒ Object
Instructs node to send rpc notification (immadiately returns / no response is generated)
Implementation of RJR::Node#notify
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
# File 'lib/rjr/nodes/ws.rb', line 163 def notify(uri, rpc_method, *args) # will block until message is published published_l = Mutex.new published_c = ConditionVariable.new invoked = false = NotificationMessage.new :method => rpc_method, :args => args, :headers => @message_headers @@em.schedule { init_client(uri) do |c| c.stream { |msg| (msg, c) } c.send_msg .to_s # XXX same issue w/ tcp node, due to nature of event machine # we aren't guaranteed that message is actually written to socket # here, process must be kept alive until data is sent or will be lost published_l.synchronize { invoked = true ; published_c.signal } end } published_l.synchronize { published_c.wait published_l unless invoked } nil end |
#send_msg(data, ws) ⇒ Object
Send data using specified websocket safely
Implementation of RJR::Node#send_msg
103 104 105 |
# File 'lib/rjr/nodes/ws.rb', line 103 def send_msg(data, ws) ws.send(data) end |
#to_s ⇒ Object
96 97 98 |
# File 'lib/rjr/nodes/ws.rb', line 96 def to_s "RJR::Nodes::WS<#{@node_id},#{@host},#{@port}>" end |