Class: PalavaMachine::Manager

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/palava_machine/manager.rb

Constant Summary collapse

PAYLOAD_NEW_PEER =
lambda { |connection_id, status = nil|
  payload = { event: 'new_peer', peer_id: connection_id }
  payload[:status] = status if status
  payload.to_json
}
PAYLOAD_PEER_LEFT =
lambda { |connection_id| {
  event: 'peer_left',
  sender_id: connection_id,
}.to_json }
SCRIPT_JOIN_ROOM =
<<-LUA
  local members = redis.call('smembers', KEYS[1])
  local count = 0
  for _, peer_id in pairs(members) do
    redis.call('publish', "ps:connection:" .. peer_id, ARGV[2])
    count = count + 1
  end
  redis.call('sadd', KEYS[1], ARGV[1])
  if count == 0 or tonumber(redis.call('get', KEYS[2])) <= count then
    redis.call('set', KEYS[2], count + 1)
  end
  redis.call('set', KEYS[3], ARGV[3])
  redis.call('set', KEYS[4], ARGV[4])
  return members
LUA
SCRIPT_LEAVE_ROOM =
<<-LUA
  redis.call('hincrby', KEYS[7], math.floor((ARGV[3] - tonumber(redis.call('get', KEYS[3]))) / 60), 1) --stats
  redis.call('srem', KEYS[1], ARGV[1])
  redis.call('del', KEYS[3])
  redis.call('del', KEYS[4])
  redis.call('del', KEYS[5])

  if redis.call('scard', KEYS[1]) == 0 then -- also delete room if it is empty
    redis.call('hincrby', KEYS[6], redis.call('get', KEYS[2]), 1) --stats
    redis.call('del', KEYS[1])
    redis.call('del', KEYS[2])
  else -- tell others in room
    for _, peer_id in pairs(redis.call('smembers', KEYS[1])) do
      redis.call('publish', "ps:connection:" .. peer_id, ARGV[2])
    end
  end
LUA

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Manager

Returns a new instance of Manager.



71
72
73
74
75
76
77
78
79
80
# File 'lib/palava_machine/manager.rb', line 71

def initialize(options = {})
  @redis_address = 'localhost:6379'
  @redis_db      = options[:db] || 0
  @connections   = SocketStore.new
  @log           = Logger.new(STDOUT)
  @log.level     = Logger::DEBUG
  @log.formatter = proc{ |level, datetime, _, msg|
    "#{datetime.strftime '%F %T'} | #{msg}\n"
  }
end

Instance Attribute Details

#connectionsObject (readonly)

Returns the value of attribute connections.



18
19
20
# File 'lib/palava_machine/manager.rb', line 18

def connections
  @connections
end

Instance Method Details

#announce_connection(ws) ⇒ Object



91
92
93
94
95
96
97
98
99
# File 'lib/palava_machine/manager.rb', line 91

def announce_connection(ws)
  connection_id = @connections.register_connection(ws)
  info "#{connection_id} <open>"

  @subscriber.subscribe "ps:connection:#{connection_id}" do |payload|
    # debug "SUB payload #{payload} for <#{connection_id}>"
    ws.send_text(payload)
  end
end

#announce_shutdown(seconds = 0) ⇒ Object



246
247
248
249
250
251
252
253
254
# File 'lib/palava_machine/manager.rb', line 246

def announce_shutdown(seconds = 0)
  warn "Announcing shutdown in #{seconds} seconds"
  @connections.sockets.each { |ws|
    ws.send_text({
      event: 'shutdown',
      seconds: seconds,
    }.to_json)
  }
end

#initialize_in_emObject



82
83
84
85
86
87
88
89
# File 'lib/palava_machine/manager.rb', line 82

def initialize_in_em
  @redis      = EM::Hiredis.connect "redis://#{@redis_address}/#{@redis_db}"
  @publisher  = @redis.pubsub
  @subscriber = EM::Hiredis.connect("redis://#{@redis_address}/#{@redis_db}").pubsub # You need an extra connection for subs
  @redis.on :failed do
    @log.error 'Could not connect to Redis server'
  end
end

#join_room(connection_id, room_id, status) ⇒ Object



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/palava_machine/manager.rb', line 116

def join_room(connection_id, room_id, status)
  return_error connection_id, 'no room id given' if !room_id || room_id.empty?
  return_error connection_id, 'room id too long' if room_id.size > 50

  @redis.get "store:connection:room:#{connection_id}" do |res|
    return_error connection_id, 'already joined another room' if res
    room_id = Digest::SHA512.hexdigest(room_id)
    info "#{connection_id} joins ##{room_id[0..10]}... #{status}"

    script_join_room(connection_id, room_id, status){ |members|
      return_error connection_id, 'room is full' unless members

      update_status_without_notifying_peers(connection_id, status){
        if members.empty?
          send_joined_room(connection_id, [])
        else
          get_statuses_for_members(members) do |members_with_statuses|
            send_joined_room(connection_id, members_with_statuses)
          end
        end
      }
    }
  end
end

#leave_room(connection_id) ⇒ Object



178
179
180
181
182
183
184
185
# File 'lib/palava_machine/manager.rb', line 178

def leave_room(connection_id)
  @redis.get("store:connection:room:#{connection_id}") do |room_id|
    next unless room_id # return_error connection_id, 'currently not in any room'

    info "#{connection_id} leaves ##{room_id[0..10]}..."
    script_leave_room(connection_id, room_id)
  end
end

#return_error(connection_id, message) ⇒ Object

Raises:



101
102
103
# File 'lib/palava_machine/manager.rb', line 101

def return_error(connection_id, message)
  raise MessageError.new(@connections[connection_id]), message
end

#send_to_peer(connection_id, peer_id, data) ⇒ Object



226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
# File 'lib/palava_machine/manager.rb', line 226

def send_to_peer(connection_id, peer_id, data)
  unless data.instance_of? Hash
    return_error connection_id, "cannot send raw data"
  end

  @redis.get("store:connection:room:#{connection_id}") do |room_id|
    return_error connection_id, 'currently not in any room' unless room_id

    @redis.sismember("store:room:members:#{room_id}", peer_id) do |is_member|
      return_error connection_id, 'unknown peer' if is_member.nil? || is_member.zero?

      unless %w[offer answer ice_candidate].include? data['event']
        return_error connection_id, 'event not allowed'
      end

      @publisher.publish "ps:connection:#{peer_id}", (data || {}).merge("sender_id" => connection_id).to_json
    end
  end
end

#shutdown!(seconds = 0) ⇒ Object



256
257
258
259
# File 'lib/palava_machine/manager.rb', line 256

def shutdown!(seconds = 0)
  sleep(seconds)
  @connections.dup.sockets.each{ |ws| ws.close(4200) } # TODO double check this one
end

#unannounce_connection(ws, close_ws = false) ⇒ Object



105
106
107
108
109
110
111
112
113
114
# File 'lib/palava_machine/manager.rb', line 105

def unannounce_connection(ws, close_ws = false)
  if connection_id = @connections.unregister_connection(ws)
    info "#{connection_id} <close>"
    leave_room(connection_id)
    @subscriber.unsubscribe "ps:connection:#{connection_id}"
    if close_ws && ws.state != :closed # currently not used FIXME
      ws.close
    end
  end
end

#update_status(connection_id, input_status) ⇒ Object



208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
# File 'lib/palava_machine/manager.rb', line 208

def update_status(connection_id, input_status)
  @redis.get("store:connection:room:#{connection_id}") do |room_id|
    return_error connection_id, 'currently not in any room' unless room_id

    update_status_without_notifying_peers(connection_id, input_status){
      @redis.smembers("store:room:members:#{room_id}") do |members|
        members.each { |peer_id|
          @publisher.publish "ps:connection:#{peer_id}", {
            event: 'peer_updated_status',
            status: input_status,
            sender_id: connection_id,
          }.to_json
        }
      end
    }
  end
end