4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
# File 'lib/magent/web_socket_server.rb', line 4
def initialize(options = {})
options = {:host => "0.0.0.0", :port => 34567}.merge(options)
$stdout.puts ">> Server running and up! #{options}}" if options[:debug]
EventMachine.run do
setup
EM.run do
EventMachine.add_periodic_timer(options.delete(:interval)||10) do
while message = Magent::WebSocketChannel.dequeue
if (channel = @channels[message["channel_id"]])
channel.push(message.to_json)
end
end
end
end
EventMachine::WebSocket.start(options) do |ws|
ws.onopen do
ws.onmessage do |msg|
data = JSON.parse(msg) rescue {}
if !handle_message(ws, data)
case data["id"]
when 'start'
if data["channel_id"].present? && (channel_id = validate_channel_id(data["channel_id"]))
key = generate_unique_key(data["key"])
@channels[channel_id] ||= EM::Channel.new
@channel_ids[key] = channel_id
sid = @channels[channel_id].subscribe { |msg| ws.send(msg) }
@sids[key] = sid
ws.onclose do
@channel_ids.delete(key)
@channels[channel_id].unsubscribe(sid)
@sids.delete(key)
end
ws.send({:id => "ack", :key => key}.to_json)
send(:on_ack, ws, channel_id) if respond_to?(:on_ack)
else
ws.close_connection
end
when 'chatmessage'
key = data["key"]
return invalid_key(ws) if key.blank? || @sids[key].blank?
channel_id = @channel_ids[key]
if channel_id
chat_message = {:id => 'chatmessage', :from => user_name(key, @sids[key]), :message => data["message"]}
@channels[channel_id].push(validate_chat_message(channel_id, chat_message).to_json)
else
ws.send({:id => 'announcement', :type => "error", :message => "cannot find the channel"}.to_json)
end
end
end end
end
end end
end
|