Class: FayeOnline::Message

Inherits:
Object
  • Object
show all
Defined in:
lib/faye-online/message.rb

Overview

To allow multiple messages process in their own Message instance.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(_msg) ⇒ Message

Returns a new instance of Message.



16
17
18
19
20
# File 'lib/faye-online/message.rb', line 16

def initialize _msg
  self.message = _msg
  self.current_channel # load message.auth
  self
end

Instance Attribute Details

#messageObject

Returns the value of attribute message.



15
16
17
# File 'lib/faye-online/message.rb', line 15

def message
  @message
end

Instance Method Details

#current_channelObject

渠道信息



138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/faye-online/message.rb', line 138

def current_channel
  # 从clientId反设置auth信息,并只设置一次
  if message['auth'] && !message['_is_auth_load']
    FayeOnline.redis.multi do
      FayeOnline.redis.set(redis_key__auth, message['auth'].to_json)
      FayeOnline.redis.expire(redis_key__auth, 2.weeks)
    end
    message['_is_auth_load'] = true
  else
    message['auth'] ||= JSON.parse(FayeOnline.redis.get(redis_key__auth)) rescue {}
  end
  message['channel']
end

#current_clientIdObject



156
# File 'lib/faye-online/message.rb', line 156

def current_clientId; message['clientId'] end

#current_userObject

用户信息



155
# File 'lib/faye-online/message.rb', line 155

def current_user; message['auth']['current_user'] end

#current_user_current_clientIdsObject



177
178
179
# File 'lib/faye-online/message.rb', line 177

def current_user_current_clientIds
  current_user_current_clientIds_arrays.map(&:to_a).flatten.uniq
end

#current_user_current_clientIds_arraysObject



174
175
176
# File 'lib/faye-online/message.rb', line 174

def current_user_current_clientIds_arrays
  [current_user_in_current_room__clientIds, current_user_in_current_time__clientIds]
end

#current_user_in_current_room__clientIdsObject

room和time 分别对应 clientId 的关系



159
160
161
162
163
164
# File 'lib/faye-online/message.rb', line 159

def current_user_in_current_room__clientIds
  @_current_user_in_current_room__clientIds ||= begin
    _a = JSON.parse(FayeOnline.redis.hget(redis_key__room, current_user['uid'])) rescue []
    Set.new(_a)
  end
end

#current_user_in_current_time__clientIdsObject



165
166
167
168
169
170
# File 'lib/faye-online/message.rb', line 165

def current_user_in_current_time__clientIds
  @_current_user_in_current_time__clientIds ||= begin
    _a = JSON.parse(FayeOnline.redis.hget(redis_key__time, current_user['uid'])) rescue []
    Set.new(_a)
  end
end

#logger_online_info(status) ⇒ Object



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/faye-online/message.rb', line 109

def logger_online_info status
  _t = online_time.start_time(time_channel)
  _start_time_str_ = _t ? Time.parse(_t).strftime("%Y-%m-%d %H:%M:%S") : nil
  puts "*#{status}* 用户#{current_user['uname']}[#{current_user['uid']}] 的clientId #{current_clientId}"
  puts "当前用户在 #{redis_key__room} 的clientIds列表为 #{current_user_in_current_room__clientIds.inspect}。在线用户有#{online_list.user_list.count}个,他们是 #{online_list.user_list.inspect}"
  puts "当前用户在 #{redis_key__time} 的clientIds列表为 #{current_user_in_current_time__clientIds.inspect}。开始时间为#{_start_time_str_}, 渡过时间为 #{online_time.spend_time(time_channel) || ''}"

  # 记录用户登陆登出情况,方便之后追踪
  _s = {"连上" => 1, "离开" => 0}[status]
  # 用可以过期的Redis键值对来检测单个clientId上次是否为 "连上" 或 "离开"
  _k = "/#{FayeOnline.redis_opts[:namespace]}/clientId_status/#{current_clientId}"
  _s_old = FayeOnline.redis.get(_k).to_s
  # *连上*和*离开* 只能操作一次
  if _s_old.blank? || # 没登陆的
    (_s.to_s != _s_old) # 已登陆的

    # 把不*连上*和*离开*把放一张表,写时不阻塞
    FayeUserLoginLog.create! :channel_id => FayeChannel[time_channel], :uid => current_user['uid'], :t => DateTime.now, :status => _s, :clientId => current_clientId

    FayeOnline.redis.multi do
      FayeOnline.redis.set(_k, _s)
      FayeOnline.redis.expire(_k, 2.weeks)
    end
  end
end

#online_listObject



134
# File 'lib/faye-online/message.rb', line 134

def online_list; FayeChannelOnlineList.find_or_create_by_channel_id(FayeChannel[room_channel]) end

#online_timeObject



135
# File 'lib/faye-online/message.rb', line 135

def online_time; FayeUserOnlineTime.find_or_create_by_user_id(current_user['uid']) end

#processObject



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/faye-online/message.rb', line 22

def process
  @time_begin = Time.now

  # 主动检测是否离开,以减少GC的消耗
  # TODO re-implement autodisonnect, pull request to faye.gem
  if message['channel'] == "/faye_online/before_leave"
    EM.run do
      EM.add_timer(3) do
        FayeOnline.disconnect(current_clientId) if not FayeOnline.engine_proxy.has_connection?(current_clientId)
      end
    end
    return false;
  end

  # 验证是否用FayeOnline处理
  step_idx = MONITORED_CHANNELS.index(message['channel'])
  return false if step_idx.nil?

  # 验证配置参数是否正确
  return false if !(message['auth'] && (room_channel && time_channel && current_user))
  raise "#{current_user.inspect} is invalid, the correct data struct is, .e.g. {uid: 470700, uname: 'mvj3'}" if !(current_user["uid"] && current_user["uname"])

  # 验证渠道名字是否合法
  (puts "invalid channel => #{message.inspect}" if ENV['DEBUG']; return false) if !(ValidChannel.call(message['auth']['room_channel']) && ValidChannel.call(message['auth']['time_channel']))
  # 验证message是否合法
  (puts "invalid message => #{message.inspect}" if ENV['DEBUG']; return false) if not FayeOnline.valid_message_proc.call(message)

  begin
    case step_idx

    # A. 处理*开启*一个客户端
    when 0
      ### 处理 room_channel 在线人数
      # *开始1* add 当前用户的clientIds数组
      current_user_in_current_room__clientIds.add(current_clientId)
      online_list.add current_user['uid']

      ### 处理 time_chanel 在线时长
      current_user_in_current_time__clientIds.add(current_clientId)
      online_time.start time_channel if not online_time.start? time_channel
      logger_online_info "连上"

      # 绑定用户数据到clientId,以供服务器在主动disconnect时使用

    # B. 处理*关闭*一个客户端(,但是这个用户可能还有其他客户端在连着)
    when 1
      # 解除 因意外原因导致的 clientId 没有过期
      current_user_current_clientIds.each do |_clientId|
        str = if FayeOnline.engine_proxy.has_connection?(_clientId)
          "clientId[#{_clientId}] 还有连接"
        else
          [current_user_current_clientIds_arrays, current_user_in_current_time__clientIds].each {|a| a.delete _clientId }
          "clientId[#{_clientId}] 没有连接的无效 已被删除"
        end
        puts str if ENV['DEBUG']
      end
      puts if ENV['DEBUG']

      ### 处理 room_channel 在线人数
      # *开始2* delete 当前用户的clientIds数组
      current_user_in_current_room__clientIds.delete(current_clientId)
      online_list.delete current_user['uid'] if current_user_in_current_room__clientIds.blank? # 一个uid的全部clientId都退出了

      ### 处理 time_chanel 在线时长
      # 关闭定时在线时间
      current_user_in_current_time__clientIds.delete(current_clientId)
      online_time.stop time_channel if current_user_in_current_time__clientIds.size.zero?
      logger_online_info "离开"
    end

  rescue => e # 确保每次都正常存储current_user_in_current_room__clientIds
    puts [e, e.backtrace].flatten.join("\n")
  end

  # *结束* save 当前用户的clientIds数组
  FayeOnline.redis.hset redis_key__room, current_user['uid'], current_user_in_current_room__clientIds.to_json
  FayeOnline.redis.hset redis_key__time, current_user['uid'], current_user_in_current_time__clientIds.to_json

  # 发布在线用户列表
  FayeOnline.faye_client.publish("#{room_channel}/user_list", {'count' => online_list.user_count, 'user_list' => online_list.user_list}) if FayeOnline.faye_client

  puts "本次处理处理时间 #{((Time.now - @time_begin) * 1000).round(2)}ms" if ENV['DEBUG']
  puts message.inspect
  puts
  return true
end

#redis_key__authObject



173
# File 'lib/faye-online/message.rb', line 173

def redis_key__auth; "/#{FayeOnline.redis_opts[:namespace]}/clientId_auth/#{current_clientId}" end

#redis_key__roomObject



171
# File 'lib/faye-online/message.rb', line 171

def redis_key__room; "/#{FayeOnline.redis_opts[:namespace]}/uid_to_clientIds#{room_channel}" end

#redis_key__timeObject



172
# File 'lib/faye-online/message.rb', line 172

def redis_key__time; "/#{FayeOnline.redis_opts[:namespace]}/uid_to_clientIds#{time_channel}" end

#room_channelObject



152
# File 'lib/faye-online/message.rb', line 152

def room_channel; message['auth']['room_channel'] end

#time_channelObject



151
# File 'lib/faye-online/message.rb', line 151

def time_channel; message['auth']['time_channel'] end