Class: Combi::WebSocket

Inherits:
Bus
  • Object
show all
Defined in:
lib/combi/buses/web_socket.rb

Defined Under Namespace

Classes: Client, Server

Constant Summary

Constants inherited from Bus

Bus::RPC_DEFAULT_TIMEOUT, Bus::RPC_MAX_POLLS

Instance Attribute Summary collapse

Attributes inherited from Bus

#services

Instance Method Summary collapse

Methods inherited from Bus

#add_service, #enable, #log, #restart!

Constructor Details

#initialize(options) ⇒ WebSocket

Returns a new instance of WebSocket.



100
101
102
103
# File 'lib/combi/buses/web_socket.rb', line 100

def initialize(options)
  super
  @handlers = {}
end

Instance Attribute Details

#handlersObject (readonly)

Returns the value of attribute handlers.



98
99
100
# File 'lib/combi/buses/web_socket.rb', line 98

def handlers
  @handlers
end

#readyObject (readonly)

Returns the value of attribute ready.



98
99
100
# File 'lib/combi/buses/web_socket.rb', line 98

def ready
  @ready
end

Instance Method Details

#invoke_service(service_name, kind, payload) ⇒ Object



187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/combi/buses/web_socket.rb', line 187

def invoke_service(service_name, kind, payload)
  handler = handlers[service_name.to_s]
  if handler
    service_instance = handler[:service_instance]
    if service_instance.respond_to? kind
      response = service_instance.send(kind, payload)
    else
      log "[WARNING] Service #{service_name}(#{service_instance.class.name}) does not respond to message #{kind}"
    end
  else
    log "[WARNING] Service #{service_name} not found"
    log "[WARNING] handlers: #{handlers.keys.inspect}"
  end
end

#manage_request(env, handler) ⇒ Object



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/combi/buses/web_socket.rb', line 124

def manage_request(env, handler)
  require 'faye/websocket'

  return unless Faye::WebSocket.websocket?(env)
  @ws = ws = Faye::WebSocket.new(env)
  session = nil

  ws.on :message do |event|
    @machine.on_message(ws, session, event.data)
  end

  ws.on :open do |event|
    session = @machine.on_open(ws, handler)
  end

  ws.on :close do |event|
    @machine.on_close(session)
  end
  # Return async Rack response
  ws.rack_response
end

#manage_ws_event(ws, handler) ⇒ Object



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/combi/buses/web_socket.rb', line 146

def manage_ws_event(ws, handler)
  session = nil

  ws.onmessage do |raw_message|
    @machine.on_message(ws, session, raw_message)
  end

  ws.onopen do |handshake|
    session = @machine.on_open(ws, handler)
  end

  ws.onclose do
    @machine.on_close(session)
  end
end

#on_message(ws, message, session = nil) ⇒ Object



162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/combi/buses/web_socket.rb', line 162

def on_message(ws, message, session = nil)
  if message['correlation_id'] && message.has_key?('result')
    @response_store.handle_rpc_response(message)
    log "Stored message with correlation_id #{message['correlation_id']} - #{message.inspect}"
    return
  end
  service_name = message['service']
  kind = message['kind']
  payload = message['payload'] || {}
  payload['session'] = session
  response = invoke_service(service_name, kind, payload)

  msg = {result: 'ok', correlation_id: message['correlation_id']}

  if response.respond_to? :succeed
    response.callback do |service_response|
      msg[:response] = service_response
      ws.send(msg.to_json)
    end
  else
    msg[:response] = response
    ws.send(msg.to_json)
  end
end

#post_initializeObject



105
106
107
108
109
110
111
112
113
114
# File 'lib/combi/buses/web_socket.rb', line 105

def post_initialize
  @ready = EventMachine::DefaultDeferrable.new
  @response_store = Combi::ResponseStore.new
  if @options[:remote_api]
    require 'eventmachine'
    @machine = Client.new(@options[:remote_api], @options[:handler], self)
  else
    @machine = Server.new(self)
  end
end

#request(name, kind, message, options = {}) ⇒ Object



208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
# File 'lib/combi/buses/web_socket.rb', line 208

def request(name, kind, message, options = {})
  options[:timeout] ||= RPC_DEFAULT_TIMEOUT
  msg = {
    service: name,
    kind: kind,
    payload: message
  }
  correlation_id = rand(10_000_000).to_s
  msg[:correlation_id] = correlation_id
  waiter = EventedWaiter.wait_for(correlation_id, @response_store, options[:timeout])
  @ready.callback do |r|
    web_socket = @machine.ws || options[:ws]
    log "sending request #{msg.inspect}"
    web_socket.send msg.to_json
  end
  waiter
end

#respond_to(service_instance, handler, options = {}) ⇒ Object



202
203
204
205
206
# File 'lib/combi/buses/web_socket.rb', line 202

def respond_to(service_instance, handler, options = {})
  log "registering #{handler}"
  handlers[handler.to_s] = {service_instance: service_instance, options: options}
  log "handlers: #{handlers.keys.inspect}"
end

#start!Object



116
117
118
# File 'lib/combi/buses/web_socket.rb', line 116

def start!
  @machine.start!
end

#stop!Object



120
121
122
# File 'lib/combi/buses/web_socket.rb', line 120

def stop!
  @machine.stop!
end