Class: RJR::Nodes::WS
Overview
Web socket node definition, listen for and invoke json-rpc requests via web sockets
Clients should specify the hostname / port when listening for and invoking requests.
note the RJR javascript client also supports sending / receiving json-rpc messages over web sockets
Constant Summary collapse
- RJR_NODE_TYPE =
:ws- PERSISTENT_NODE =
true
Instance Attribute Summary
Attributes inherited from RJR::Node
#dispatcher, #message_headers, #node_id
Instance Method Summary collapse
-
#initialize(args = {}) ⇒ WS
constructor
WS initializer.
-
#invoke(uri, rpc_method, *args) ⇒ Object
Instructs node to send rpc request, and wait for / return response.
-
#listen ⇒ Object
Instruct Node to start listening for and dispatching rpc requests.
-
#notify(uri, rpc_method, *args) ⇒ Object
Instructs node to send rpc notification (immadiately returns / no response is generated).
-
#send_msg(data, ws) ⇒ Object
Send data using specified websocket safely.
- #to_s ⇒ Object
Methods inherited from RJR::Node
#clear_event_handlers, em, #halt, #join, #node_type, #on, #persistent?, persistent?, tp
Constructor Details
#initialize(args = {}) ⇒ WS
WS initializer
88 89 90 91 92 93 94 95 |
# File 'lib/rjr/nodes/ws.rb', line 88 def initialize(args = {}) super(args) @host = args[:host] @port = args[:port] @connections = [] @connections_lock = Mutex.new end |
Instance Method Details
#invoke(uri, rpc_method, *args) ⇒ Object
Instructs node to send rpc request, and wait for / return response
Implementation of RJR::Node#invoke
Do not invoke directly from em event loop or callback as will block the message subscription used to receive responses
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/rjr/nodes/ws.rb', line 134 def invoke(uri, rpc_method, *args) = RequestMessage.new :method => rpc_method, :args => args, :headers => @@em.schedule { init_client(uri) do |c| c.stream { |msg| (msg.data, c) } c.send_msg .to_s end } # TODO optional timeout for response ? result = wait_for_result() if result.size > 2 raise Exception, result[2] end return result[1] end |
#listen ⇒ Object
Instruct Node to start listening for and dispatching rpc requests
Implementation of RJR::Node#listen
111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/rjr/nodes/ws.rb', line 111 def listen @@em.schedule do EventMachine::WebSocket.run(:host => @host, :port => @port) do |ws| ws.onopen { } ws.onclose { @connection_event_handlers[:closed].each { |h| h.call self } } ws.onerror { |e| @connection_event_handlers[:error].each { |h| h.call self } } ws. { |msg| (msg, ws) } end end self end |
#notify(uri, rpc_method, *args) ⇒ Object
Instructs node to send rpc notification (immadiately returns / no response is generated)
Implementation of RJR::Node#notify
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/rjr/nodes/ws.rb', line 164 def notify(uri, rpc_method, *args) # will block until message is published published_l = Mutex.new published_c = ConditionVariable.new invoked = false = NotificationMessage.new :method => rpc_method, :args => args, :headers => @@em.schedule { init_client(uri) do |c| c.stream { |msg| (msg.data, c) } c.send_msg .to_s # XXX same issue w/ tcp node, due to nature of event machine # we aren't guaranteed that message is actually written to socket # here, process must be kept alive until data is sent or will be lost published_l.synchronize { invoked = true ; published_c.signal } end } published_l.synchronize { published_c.wait published_l unless invoked } nil end |
#send_msg(data, ws) ⇒ Object
Send data using specified websocket safely
Implementation of RJR::Node#send_msg
104 105 106 |
# File 'lib/rjr/nodes/ws.rb', line 104 def send_msg(data, ws) @@em.schedule { ws.send(data) } end |
#to_s ⇒ Object
97 98 99 |
# File 'lib/rjr/nodes/ws.rb', line 97 def to_s "RJR::Nodes::WS<#{@node_id},#{@host},#{@port}>" end |