Class: FayeOnline

Inherits:
Object
  • Object
show all
Defined in:
lib/faye-online/message.rb,
lib/faye-online.rb

Overview

管理用户在线文档 一个用户在单个房间单个多个页面,就有了多个clientId。

  1. 用户在线状态规则

在线: 超过一个clientId 离线: 零个clientId

  1. 清理过期clientId,以防止存储在redis Hashes里的数组无法删除个别元素。因为怀疑为过期clientId之后就没有连接,所以也不会进行任何操作。

采用策略是放到一个clientId和开始时间对应的redis Hashes里,在用户‘/meta/disconnect’时检测各个clientId有效性

Defined Under Namespace

Classes: Message

Constant Summary collapse

ValidChannel =

只支持数字字母和斜杠

proc {|channel| !!channel.to_s.match(/\A[0-9a-z\/]+\Z/i) }
MONITORED_CHANNELS =

‘/meta/subscribe’, ‘/connect’, ‘/close’ are ignored

['/meta/connect', '/meta/disconnect']
LOCAL_IP =
Socket.ip_address_list.detect{|intf| intf.ipv4_private?}.ip_address
LOCAL_FAYE_URI =
URI.parse("http://#{LOCAL_IP}:#{ENV['FAYE_PORT'] || 9292}/faye")

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(redis_opts, valid_message_proc = nil) ⇒ FayeOnline

Returns a new instance of FayeOnline.



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/faye-online.rb', line 37

def initialize redis_opts, valid_message_proc = nil

  raise "Please run `$faye_server = FayeOnline.get_server` first, cause we have to bind disconnect event." if not $faye_server.is_a?(Faye::RackAdapter)
  FayeOnline.redis_opts = redis_opts
  FayeOnline.valid_message_proc = valid_message_proc || (proc {|message| true })
  FayeOnline.redis = Redis.new(FayeOnline.redis_opts)
  FayeOnline.redis.select FayeOnline.redis_opts[:database]

  FayeOnline.faye_client ||= Faye::Client.new(LOCAL_FAYE_URI.to_s)

  # 配置ActiveRecord
  if Rails.root.nil?
    Dir[File.expand_path('../../app/models/*.rb', __FILE__)].each {|f| require f }
  end

  return self
end

Class Method Details

.channel_clientIds_arrayObject



60
61
62
63
64
65
66
67
# File 'lib/faye-online.rb', line 60

def self.channel_clientIds_array
  array = []
  FayeOnline.redis.keys("/#{FayeOnline.redis_opts[:namespace]}/uid_to_clientIds*").sort.each do |k|
    _data = FayeOnline.redis.hgetall(k).values.map {|i| JSON.parse(i) rescue i }.flatten
    array << [k, _data]
  end
  array
end

.disconnect(clientId) ⇒ Object



72
73
74
75
76
77
# File 'lib/faye-online.rb', line 72

def self.disconnect clientId
  message = {'channel' => '/meta/disconnect', 'clientId' => clientId}

  # fake a client to disconnect, 仅仅接受message.auth为空,即网络失去连接的情况
  FayeOnline::Message.new(message.merge('fake' => 'true')).process if not message['auth']
end

.get_server(redis_opts, valid_message_proc = nil) ⇒ Object



3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/faye-online/server.rb', line 3

def FayeOnline.get_server redis_opts, valid_message_proc = nil
  $faye_server = Faye::RackAdapter.new(
    :mount   => '/faye',

    # the maximum time to hold a connection open before returning the response. This is given in seconds and must be smaller than the timeout on your frontend webserver(thin). Faye uses Thin as its webserver, whose default timeout is 30 seconds.
    # https://groups.google.com/forum/?fromgroups#!topic/faye-users/DvFrPGOinKw
    :timeout => 10,

    # [https://groups.google.com/forum/#!searchin/faye-users/disconnect/faye-users/2bn8xUHF5-E/A4a3Sk7RgW4J] It's expected. The browser will not always be able to deliver an explicit disconnect message, which is why there is server-side idle client detection.
    # garbag collect disconnected clientIds in EventMachine.add_periodic_timer
    :engine  => {:gc => 60}.merge(redis_opts.merge(:type  => Faye::Redis)),

    :ping => 30 # (optional) how often, in seconds, to send keep-alive ping messages over WebSocket and EventSource connections. Use this if your Faye server will be accessed through a proxy that kills idle connections.
  )

  $faye_server.bind(:handshake) do |clientId|
  end
  $faye_server.bind(:subscribe) do |clientId, channel|
  end
  $faye_server.bind(:unsubscribe) do |clientId, channel|
  end
  $faye_server.bind(:publish) do |clientId, channel, data|
  end
  $faye_server.bind(:disconnect) do |clientId|
    FayeOnline.disconnect clientId

    # dynamic compute interval seconds
    tmp = FayeOnline.channel_clientIds_array.reject {|i| i[1].blank? }
    # delete below, cause map data is valid
    if tmp.any?
      puts "开始有 #{FayeOnline.uniq_clientIds.count}"
      tmp.map(&:last).flatten.uniq.shuffle[0..19].each do |_clientId|
        if not FayeOnline.engine_proxy.has_connection? _clientId
          puts "开始处理无效 #{_clientId}"
          # 1. 先伪装去disconnect clientId
          # 2. 如果失败,就直接操作redis修改
          if not FayeOnline.disconnect(_clientId)
            # 没删除成功,因为之前没有设置auth
            k = (tmp.detect {|a, b| b.index(_clientId) } || [])[0]
            # 直接从redis清除无效_clientId
            FayeOnline.redis.hgetall(k).each do |k2, v2|
              v3 = JSON.parse(v2) rescue []
              v3.delete _clientId
              FayeOnline.redis.hset(k, k2, v3.to_json)
            end if k
          end
        end
      end
      puts "结束有 #{FayeOnline.uniq_clientIds.count}"
    end
  end

  $faye_server.add_extension FayeOnline.new(redis_opts, valid_message_proc)

  FayeOnline.engine_proxy = $faye_server.instance_variable_get("@server").engine

  return $faye_server
end

.log(params, user_name_proc = proc {|uid| uid }) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/faye-online/status.rb', line 43

def FayeOnline.log params, user_name_proc = proc {|uid| uid }
  scope = FayeUserLoginLog.order("t ASC")

  # 个人整理后列出
  if params[:user]
    scope = scope.where(:uid => user_name_proc.call(params[:user]))

    channel_to_logs = scope.inject({}) {|h, log| i = FayeChannel[log.channel_id]; h[i] ||= []; h[i] << log; h }

    array = ["用户 #{params[:user]}[#{user_name_proc.call(params[:user])}] 的登陆日志详情"]
    channel_to_logs.each do |channel, logs|
      array << ''
      array << channel
      logs2 = logs.inject({}) {|h, log| h[log.clientId] ||= []; h[log.clientId] << log; h }

      # 合并交叉的时间
      ctc = CrossTimeCalculation.new
      logs2.each do |clientId, _logs|
        # logs = logs.sort {|a, b| (a && a.t) <=> (b && b.t) }
        if _logs.size > 0
          # binding.pry if _logs[1].nil?
          te = _logs[1] ? _logs[1].t : nil
          ctc.add(_logs[0].t, te)
          _time_passed = CrossTimeCalculation.new.add(_logs[0].t, te).total_seconds.to_i
        end
        array << [clientId, _logs.map {|_log| "#{_log.status}:  #{_log.t.strftime("%Y-%m-%d %H:%M:%S")}" }, "#{_time_passed || '未知'}"].flatten.compact.inspect
      end
      array << "共用时 #{ctc.total_seconds.to_i}"
    end
    array
  # 群体直接列出日志
  else
    scope.limit(500).map do |log|
      [%w[离开 连上][log.status], log.uid, user_name_proc.call(log.uid), log.t.strftime("%Y-%m-%d %H:%M:%S"), FayeChannel[log.channel_id], log.clientId].inspect
    end
  end
end

.statusObject



3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/faye-online/status.rb', line 3

def FayeOnline.status
  array = []
  clientIds = Set.new
  channel_clientIds_array = FayeOnline.channel_clientIds_array

  clientId_to_users = channel_clientIds_array.map(&:last).flatten.uniq.inject({}) do |h, _clientId|
    _data = JSON.parse(Redis.current.get("/#{FayeOnline.redis_opts[:namespace]}/clientId_auth/#{_clientId}")) rescue {}
    h[_clientId] = (_data['current_user'] || {}).except('uhash').values
    h
  end

  channel_clientIds_array.each do |_channel, _clientIds|
    _a = _clientIds.map {|i| [i, clientId_to_users[i]] }
    _c = _a.map {|i| i[1][0] }.uniq.count
    array << "#{_channel}: #{_c}个用户:  #{_a}"
    _clientIds.each {|i| clientIds.add i }
  end

  array.unshift ""
  users = clientIds.map {|i| clientId_to_users[i] }.uniq {|i| i[0] }
  array.unshift "/classes/[0-9]+ 用于班级讨论的消息通讯, /courses/[0-9]+/lessons/[0-9]+ 用于课时的计时"
  array.unshift ""
  array.unshift "#{users.count}个用户分别是: #{users}"
  array.unshift ""
  array.unshift "实时在线clientIds总共有#{clientIds.count}个: #{clientIds.to_a}"
  array.unshift ""
  array.unshift "一个clientId表示用户打开了一个页面。一个用户在同一课时可能打开多个页面,那就是一个user,多个clientId"

  # 删除意外没有退出的在线用户列表
  uids = users.map(&:first)
  FayeChannelOnlineList.all.reject {|i| i.data.blank? }.each do |online_list|
    online_list.user_list.each do |uid|
      online_list.delete uid if not uids.include? uid
    end
  end

  array
end

.uniq_clientIdsObject



68
69
70
# File 'lib/faye-online.rb', line 68

def self.uniq_clientIds
  self.channel_clientIds_array.map(&:last).flatten.uniq
end

Instance Method Details

#incoming(message, callback) ⇒ Object



55
56
57
58
# File 'lib/faye-online.rb', line 55

def incoming(message, callback)
  Message.new(message).process
  callback.call(message)
end