Class: Ki::Middleware::Realtime

Inherits:
Object
  • Object
show all
Includes:
BaseMiddleware
Defined in:
lib/ki/middleware/realtime.rb

Instance Method Summary collapse

Methods included from BaseMiddleware

#initialize

Instance Method Details

#call(env) ⇒ Object



8
9
10
11
12
13
14
15
16
17
# File 'lib/ki/middleware/realtime.rb', line 8

def call(env)
  req = BaseRequest.new env
  if req.path.to_s == '/realtime/info'
    show_stats
  elsif req.path.to_s == '/realtime' && Faye::WebSocket.websocket?(env)
    handle_websocket env
  else
    @app.call env
  end
end

#channel_manager_action(json, ws, action) ⇒ Object



73
74
75
76
77
78
79
80
# File 'lib/ki/middleware/realtime.rb', line 73

def channel_manager_action(json, ws, action)
  if json['channel_name']
    output = ::Ki::ChannelManager.send(action, json)
    ws_send(ws, output)
  else
    ws_send(ws, { message: 'Please specify a channel_name' })
  end
end

#handle_websocket(env) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/ki/middleware/realtime.rb', line 32

def handle_websocket(env)
  ws = Faye::WebSocket.new(env)

  socket = ::Ki::ChannelManager.connect
  ws_send(ws, socket)

  ws.on :message do |event|
    on_message(ws, socket, event.data)
  end

  timer = EventMachine::PeriodicTimer.new(1) do
    msgs = ::Ki::ChannelManager.tick(socket_id: socket['id'])
    ws_send(ws, { messages: msgs }) if msgs.count.positive?
  end

  ws.on :close do # |event|
    timer.cancel
    ::Ki::ChannelManager.disconnect socket
    ws = nil
  end

  ws.rack_response
end

#on_message(ws, socket, data) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/ki/middleware/realtime.rb', line 56

def on_message(ws, socket, data)
  json = JSON.parse(data)
  json['socket_id'] = socket['id']
  if json['type'] == 'subscribe'
    channel_manager_action(json, ws, 'subscribe')
  elsif json['type'] == 'unsubscribe'
    output = ::Ki::ChannelManager.unsubscribe json
    ws_send(ws, output)
  elsif json['type'] == 'publish'
    channel_manager_action(json, ws, 'publish')
  else
    ws_send(ws, { message: 'Please specify a valid type' })
  end
rescue JSON::ParserError
  ws_send(ws, { message: 'Please send a valid json string' })
end

#show_statsObject



23
24
25
26
27
28
29
30
# File 'lib/ki/middleware/realtime.rb', line 23

def show_stats
  hash = {
    sockets: ::Ki::ChannelManager.sockets
  }
  resp = Rack::Response.new(hash.to_json, 200)
  resp['Content-Type'] = 'application/json'
  resp.finish
end

#ws_send(ws, hash) ⇒ Object



19
20
21
# File 'lib/ki/middleware/realtime.rb', line 19

def ws_send(ws, hash)
  ws.send(hash.to_json.to_s)
end