Class: Combi::WebSocket
- Inherits:
-
Bus
- Object
- Bus
- Combi::WebSocket
show all
- Defined in:
- lib/combi/buses/web_socket.rb
Defined Under Namespace
Classes: Client, Server
Constant Summary
Constants inherited
from Bus
Bus::RPC_DEFAULT_TIMEOUT, Bus::RPC_MAX_POLLS
Instance Attribute Summary collapse
Attributes inherited from Bus
#services
Instance Method Summary
collapse
-
#initialize(options) ⇒ WebSocket
constructor
A new instance of WebSocket.
-
#invoke_service(service_name, kind, payload) ⇒ Object
-
#manage_request(env, handler) ⇒ Object
-
#manage_ws_event(ws, handler) ⇒ Object
-
#on_message(ws, message, session = nil) ⇒ Object
-
#post_initialize ⇒ Object
-
#request(name, kind, message, options = {}) ⇒ Object
-
#respond_to(service_instance, handler, options = {}) ⇒ Object
-
#start! ⇒ Object
-
#stop! ⇒ Object
Methods inherited from Bus
#add_service, #enable, #log, #restart!
Constructor Details
#initialize(options) ⇒ WebSocket
Returns a new instance of WebSocket.
100
101
102
103
|
# File 'lib/combi/buses/web_socket.rb', line 100
def initialize(options)
super
@handlers = {}
end
|
Instance Attribute Details
#handlers ⇒ Object
Returns the value of attribute handlers.
98
99
100
|
# File 'lib/combi/buses/web_socket.rb', line 98
def handlers
@handlers
end
|
#ready ⇒ Object
Returns the value of attribute ready.
98
99
100
|
# File 'lib/combi/buses/web_socket.rb', line 98
def ready
@ready
end
|
Instance Method Details
#invoke_service(service_name, kind, payload) ⇒ Object
187
188
189
190
191
192
193
194
195
196
197
198
199
200
|
# File 'lib/combi/buses/web_socket.rb', line 187
def invoke_service(service_name, kind, payload)
handler = handlers[service_name.to_s]
if handler
service_instance = handler[:service_instance]
if service_instance.respond_to? kind
response = service_instance.send(kind, payload)
else
log "[WARNING] Service #{service_name}(#{service_instance.class.name}) does not respond to message #{kind}"
end
else
log "[WARNING] Service #{service_name} not found"
log "[WARNING] handlers: #{handlers.keys.inspect}"
end
end
|
#manage_request(env, handler) ⇒ Object
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
|
# File 'lib/combi/buses/web_socket.rb', line 124
def manage_request(env, handler)
require 'faye/websocket'
return unless Faye::WebSocket.websocket?(env)
@ws = ws = Faye::WebSocket.new(env)
session = nil
ws.on :message do |event|
@machine.on_message(ws, session, event.data)
end
ws.on :open do |event|
session = @machine.on_open(ws, handler)
end
ws.on :close do |event|
@machine.on_close(session)
end
ws.rack_response
end
|
#manage_ws_event(ws, handler) ⇒ Object
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
|
# File 'lib/combi/buses/web_socket.rb', line 146
def manage_ws_event(ws, handler)
session = nil
ws.onmessage do |raw_message|
@machine.on_message(ws, session, raw_message)
end
ws.onopen do |handshake|
session = @machine.on_open(ws, handler)
end
ws.onclose do
@machine.on_close(session)
end
end
|
#on_message(ws, message, session = nil) ⇒ Object
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
|
# File 'lib/combi/buses/web_socket.rb', line 162
def on_message(ws, message, session = nil)
if message['correlation_id'] && message.has_key?('result')
@response_store.handle_rpc_response(message)
log "Stored message with correlation_id #{message['correlation_id']} - #{message.inspect}"
return
end
service_name = message['service']
kind = message['kind']
payload = message['payload'] || {}
payload['session'] = session
response = invoke_service(service_name, kind, payload)
msg = {result: 'ok', correlation_id: message['correlation_id']}
if response.respond_to? :succeed
response.callback do |service_response|
msg[:response] = service_response
ws.send(msg.to_json)
end
else
msg[:response] = response
ws.send(msg.to_json)
end
end
|
#post_initialize ⇒ Object
105
106
107
108
109
110
111
112
113
114
|
# File 'lib/combi/buses/web_socket.rb', line 105
def post_initialize
@ready = EventMachine::DefaultDeferrable.new
@response_store = Combi::ResponseStore.new
if @options[:remote_api]
require 'eventmachine'
@machine = Client.new(@options[:remote_api], @options[:handler], self)
else
@machine = Server.new(self)
end
end
|
#request(name, kind, message, options = {}) ⇒ Object
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
|
# File 'lib/combi/buses/web_socket.rb', line 208
def request(name, kind, message, options = {})
options[:timeout] ||= RPC_DEFAULT_TIMEOUT
msg = {
service: name,
kind: kind,
payload: message
}
correlation_id = rand(10_000_000).to_s
msg[:correlation_id] = correlation_id
waiter = EventedWaiter.wait_for(correlation_id, @response_store, options[:timeout])
@ready.callback do |r|
web_socket = @machine.ws || options[:ws]
log "sending request #{msg.inspect}"
web_socket.send msg.to_json
end
waiter
end
|
#respond_to(service_instance, handler, options = {}) ⇒ Object
202
203
204
205
206
|
# File 'lib/combi/buses/web_socket.rb', line 202
def respond_to(service_instance, handler, options = {})
log "registering #{handler}"
handlers[handler.to_s] = {service_instance: service_instance, options: options}
log "handlers: #{handlers.keys.inspect}"
end
|
#start! ⇒ Object
116
117
118
|
# File 'lib/combi/buses/web_socket.rb', line 116
def start!
@machine.start!
end
|
#stop! ⇒ Object
120
121
122
|
# File 'lib/combi/buses/web_socket.rb', line 120
def stop!
@machine.stop!
end
|