Class: PalavaMachine::Manager
- Inherits:
-
Object
- Object
- PalavaMachine::Manager
- Extended by:
- Forwardable
- Defined in:
- lib/palava_machine/manager.rb
Constant Summary collapse
- PAYLOAD_NEW_PEER =
lambda { |connection_id, status = nil| payload = { event: 'new_peer', peer_id: connection_id } payload[:status] = status if status payload.to_json }
- PAYLOAD_PEER_LEFT =
lambda { |connection_id| { event: 'peer_left', sender_id: connection_id, }.to_json }
- SCRIPT_JOIN_ROOM =
<<-LUA local members = redis.call('smembers', KEYS[1]) local count = 0 for _, peer_id in pairs(members) do redis.call('publish', "ps:connection:" .. peer_id, ARGV[2]) count = count + 1 end redis.call('sadd', KEYS[1], ARGV[1]) if count == 0 or tonumber(redis.call('get', KEYS[2])) <= count then redis.call('set', KEYS[2], count + 1) end redis.call('set', KEYS[3], ARGV[3]) redis.call('set', KEYS[4], ARGV[4]) return members LUA
- SCRIPT_LEAVE_ROOM =
<<-LUA redis.call('hincrby', KEYS[7], math.floor((ARGV[3] - tonumber(redis.call('get', KEYS[3]))) / 60), 1) --stats redis.call('srem', KEYS[1], ARGV[1]) redis.call('del', KEYS[3]) redis.call('del', KEYS[4]) redis.call('del', KEYS[5]) if redis.call('scard', KEYS[1]) == 0 then -- also delete room if it is empty redis.call('hincrby', KEYS[6], redis.call('get', KEYS[2]), 1) --stats redis.call('del', KEYS[1]) redis.call('del', KEYS[2]) else -- tell others in room for _, peer_id in pairs(redis.call('smembers', KEYS[1])) do redis.call('publish', "ps:connection:" .. peer_id, ARGV[2]) end end LUA
Instance Attribute Summary collapse
-
#connections ⇒ Object
readonly
Returns the value of attribute connections.
Instance Method Summary collapse
- #announce_connection(ws) ⇒ Object
- #announce_shutdown(seconds = 0) ⇒ Object
-
#initialize(options = {}) ⇒ Manager
constructor
A new instance of Manager.
- #initialize_in_em ⇒ Object
- #join_room(connection_id, room_id, status) ⇒ Object
- #leave_room(connection_id) ⇒ Object
- #return_error(connection_id, message) ⇒ Object
- #send_to_peer(connection_id, peer_id, data) ⇒ Object
- #shutdown!(seconds = 0) ⇒ Object
- #unannounce_connection(ws, close_ws = false) ⇒ Object
- #update_status(connection_id, input_status) ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ Manager
Returns a new instance of Manager.
71 72 73 74 75 76 77 78 79 80 |
# File 'lib/palava_machine/manager.rb', line 71 def initialize( = {}) @redis_address = 'localhost:6379' @redis_db = [:db] || 0 @connections = SocketStore.new @log = Logger.new(STDOUT) @log.level = Logger::DEBUG @log.formatter = proc{ |level, datetime, _, msg| "#{datetime.strftime '%F %T'} | #{msg}\n" } end |
Instance Attribute Details
#connections ⇒ Object (readonly)
Returns the value of attribute connections.
18 19 20 |
# File 'lib/palava_machine/manager.rb', line 18 def connections @connections end |
Instance Method Details
#announce_connection(ws) ⇒ Object
91 92 93 94 95 96 97 98 99 |
# File 'lib/palava_machine/manager.rb', line 91 def announce_connection(ws) connection_id = @connections.register_connection(ws) info "#{connection_id} <open>" @subscriber.subscribe "ps:connection:#{connection_id}" do |payload| # debug "SUB payload #{payload} for <#{connection_id}>" ws.send_text(payload) end end |
#announce_shutdown(seconds = 0) ⇒ Object
246 247 248 249 250 251 252 253 254 |
# File 'lib/palava_machine/manager.rb', line 246 def announce_shutdown(seconds = 0) warn "Announcing shutdown in #{seconds} seconds" @connections.sockets.each { |ws| ws.send_text({ event: 'shutdown', seconds: seconds, }.to_json) } end |
#initialize_in_em ⇒ Object
82 83 84 85 86 87 88 89 |
# File 'lib/palava_machine/manager.rb', line 82 def initialize_in_em @redis = EM::Hiredis.connect "redis://#{@redis_address}/#{@redis_db}" @publisher = @redis.pubsub @subscriber = EM::Hiredis.connect("redis://#{@redis_address}/#{@redis_db}").pubsub # You need an extra connection for subs @redis.on :failed do @log.error 'Could not connect to Redis server' end end |
#join_room(connection_id, room_id, status) ⇒ Object
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/palava_machine/manager.rb', line 116 def join_room(connection_id, room_id, status) return_error connection_id, 'no room id given' if !room_id || room_id.empty? return_error connection_id, 'room id too long' if room_id.size > 50 @redis.get "store:connection:room:#{connection_id}" do |res| return_error connection_id, 'already joined another room' if res room_id = Digest::SHA512.hexdigest(room_id) info "#{connection_id} joins ##{room_id[0..10]}... #{status}" script_join_room(connection_id, room_id, status){ |members| return_error connection_id, 'room is full' unless members (connection_id, status){ if members.empty? send_joined_room(connection_id, []) else get_statuses_for_members(members) do |members_with_statuses| send_joined_room(connection_id, members_with_statuses) end end } } end end |
#leave_room(connection_id) ⇒ Object
178 179 180 181 182 183 184 185 |
# File 'lib/palava_machine/manager.rb', line 178 def leave_room(connection_id) @redis.get("store:connection:room:#{connection_id}") do |room_id| next unless room_id # return_error connection_id, 'currently not in any room' info "#{connection_id} leaves ##{room_id[0..10]}..." script_leave_room(connection_id, room_id) end end |
#return_error(connection_id, message) ⇒ Object
101 102 103 |
# File 'lib/palava_machine/manager.rb', line 101 def return_error(connection_id, ) raise MessageError.new(@connections[connection_id]), end |
#send_to_peer(connection_id, peer_id, data) ⇒ Object
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 |
# File 'lib/palava_machine/manager.rb', line 226 def send_to_peer(connection_id, peer_id, data) unless data.instance_of? Hash return_error connection_id, "cannot send raw data" end @redis.get("store:connection:room:#{connection_id}") do |room_id| return_error connection_id, 'currently not in any room' unless room_id @redis.sismember("store:room:members:#{room_id}", peer_id) do |is_member| return_error connection_id, 'unknown peer' if is_member.nil? || is_member.zero? unless %w[offer answer ice_candidate].include? data['event'] return_error connection_id, 'event not allowed' end @publisher.publish "ps:connection:#{peer_id}", (data || {}).merge("sender_id" => connection_id).to_json end end end |
#shutdown!(seconds = 0) ⇒ Object
256 257 258 259 |
# File 'lib/palava_machine/manager.rb', line 256 def shutdown!(seconds = 0) sleep(seconds) @connections.dup.sockets.each{ |ws| ws.close(4200) } # TODO double check this one end |
#unannounce_connection(ws, close_ws = false) ⇒ Object
105 106 107 108 109 110 111 112 113 114 |
# File 'lib/palava_machine/manager.rb', line 105 def unannounce_connection(ws, close_ws = false) if connection_id = @connections.unregister_connection(ws) info "#{connection_id} <close>" leave_room(connection_id) @subscriber.unsubscribe "ps:connection:#{connection_id}" if close_ws && ws.state != :closed # currently not used FIXME ws.close end end end |
#update_status(connection_id, input_status) ⇒ Object
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
# File 'lib/palava_machine/manager.rb', line 208 def update_status(connection_id, input_status) @redis.get("store:connection:room:#{connection_id}") do |room_id| return_error connection_id, 'currently not in any room' unless room_id (connection_id, input_status){ @redis.smembers("store:room:members:#{room_id}") do |members| members.each { |peer_id| @publisher.publish "ps:connection:#{peer_id}", { event: 'peer_updated_status', status: input_status, sender_id: connection_id, }.to_json } end } end end |