Class: Myxi::Session

Inherits:
EventableSocket show all
Defined in:
lib/myxi/session.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from EventableSocket

#close_after_write, #handle_w, #write

Constructor Details

#initialize(event_loop, client_socket) ⇒ Session

Returns a new instance of Session.



9
10
11
12
13
14
15
16
17
18
19
# File 'lib/myxi/session.rb', line 9

def initialize(event_loop, client_socket)
  @id  = SecureRandom.hex(8)
  @closure_callbacks = []
  @data = {}

  @handshake = WebSocket::Handshake::Server.new
  @state = :handshake
  super
  @event_loop.sessions << self

end

Instance Attribute Details

#auth_objectObject

attr_accessor :queue



23
24
25
# File 'lib/myxi/session.rb', line 23

def auth_object
  @auth_object
end

#idObject (readonly)

Returns the value of attribute id.



21
22
23
# File 'lib/myxi/session.rb', line 21

def id
  @id
end

#tagObject

Returns the value of attribute tag.



24
25
26
# File 'lib/myxi/session.rb', line 24

def tag
  @tag
end

Instance Method Details

#[](name) ⇒ Object



83
84
85
# File 'lib/myxi/session.rb', line 83

def [](name)
  @data[name.to_sym]
end

#[]=(name, value) ⇒ Object



87
88
89
90
# File 'lib/myxi/session.rb', line 87

def []=(name, value)
  Myxi.logger.debug "[#{id}] Stored '#{name}' with '#{value}'"
  @data[name.to_sym] = value
end

#closeObject

Called when the connection for this session is closed



183
184
185
186
187
188
189
190
191
# File 'lib/myxi/session.rb', line 183

def close
  Myxi.logger.debug "[#{id}] Session closed"
  @event_loop.sessions.delete(self)
  @queue.delete if @queue
  while callback = @closure_callbacks.shift
    callback.call
  end
  super
end

#handle_rObject



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/myxi/session.rb', line 46

def handle_r
  case @state
  when :handshake
    @handshake << @socket.readpartial(1048576)
    if @handshake.finished?
      write(@handshake.to_s)
      if @handshake.valid?
        on_connect
        @state = :established
        @frame_handler = WebSocket::Frame::Incoming::Server.new(version: @handshake.version)
      else
        close_after_write
      end
    end
  when :established
    @frame_handler << @socket.readpartial(1048576)
    while frame = @frame_handler.next
      msg = frame.data
      json = JSON.parse(msg) rescue nil
      if json.is_a?(Hash)
        tag = json['tag'] || nil
        payload = json['payload'] || {}
        Myxi.logger.debug "[#{id}] \e[43;37mACTION\e[0m \e[33m#{json}\e[0m"
        if action = Myxi::Action::ACTIONS[json['action'].to_s.to_sym]
          action.execute(self, payload)
        else
          send_text_data({:event => 'Error', :tag => tag, :payload => {:error => 'InvalidAction'}}.to_json)
        end
      else
        send_text_data({:event => 'Error', :payload => {:error => 'InvalidJSON'}}.to_json)
      end
    end
  end
rescue EOFError, Errno::ECONNRESET, IOError
  close
end

#on_close(&block) ⇒ Object

Adds a callback to be executed when this session closes



196
197
198
# File 'lib/myxi/session.rb', line 196

def on_close(&block)
  @closure_callbacks << block
end

#on_connectObject



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/myxi/session.rb', line 26

def on_connect
  Myxi.logger.debug "[#{id}] Connection opened"
  send_text_data({:event => 'Welcome', :payload => {:id => id}}.to_json)
  begin
    @queue = Myxi.channel.queue("", :exclusive => true)
  rescue NoMethodError
    # This exception may be raised when something goes very wrong with the RabbitMQ connection
    # Unfortunately the only practical solution is to restart the client
    Process.exit(1)
  end
  @queue.subscribe do |delivery_info, properties, body|
    if hash = JSON.parse(body) rescue nil
      hash['mq'] = {'e' => delivery_info.exchange, 'rk' => delivery_info.routing_key}
      payload = hash.to_json.force_encoding('UTF-8')
      Myxi.logger.debug "[#{id}] \e[45;37mEVENT\e[0m \e[35m#{payload}\e[0m (to #{delivery_info.exchange}/#{delivery_info.routing_key})"
      send_text_data(payload)
    end
  end
end

#send(name, payload = {}) ⇒ Object

 Send an event back to the client on this session



102
103
104
105
106
# File 'lib/myxi/session.rb', line 102

def send(name, payload = {})
  payload = {:event => name, :tag => tag, :payload => payload}.to_json.force_encoding('UTF-8')
  send_text_data(payload)
  Myxi.logger.debug "[#{id}] \e[46;37mMESSAGE\e[0m \e[36m#{payload}\e[0m"
end

#send_text_data(data) ⇒ Object



200
201
202
203
# File 'lib/myxi/session.rb', line 200

def send_text_data(data)
  sender = WebSocket::Frame::Outgoing::Server.new(version: @handshake.version, data: data, type: :text)
  write(sender.to_s)
end

#subscribe(exchange_name, routing_key) ⇒ Object

Subscribe this session to receive items for the given exchange & routing key



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/myxi/session.rb', line 111

def subscribe(exchange_name, routing_key)
  if exchange = Myxi::Exchange::EXCHANGES[exchange_name.to_sym]
    if exchange.can_subscribe?(routing_key, self.auth_object)
      subscriptions[exchange_name.to_s] ||= []
      if subscriptions[exchange_name.to_s].include?(routing_key.to_s)
        send('Error', :error => 'AlreadySubscribed', :exchange => exchange_name, :routing_key => routing_key)
      else
        @queue.bind(exchange.exchange_name.to_s, :routing_key => routing_key.to_s)
        subscriptions[exchange_name.to_s] << routing_key.to_s
        Myxi.logger.debug "[#{id}] \e[42;37mSUBSCRIBED\e[0m \e[32m#{exchange_name} / #{routing_key}\e[0m"
        send('Subscribed', :exchange => exchange_name, :routing_key => routing_key)
      end
    else
      send('Error', :error => 'SubscriptionDenied', :exchange => exchange_name, :routing_key => routing_key)
    end
  else
    send('Error', :error => 'InvalidExchange', :exchange => exchange_name)
  end
end

#subscriptionsObject

 Keep track of all subscriptions



95
96
97
# File 'lib/myxi/session.rb', line 95

def subscriptions
  @subscriptions ||= {}
end

#touchObject

Called by the server every so often whenever this session is active. This should verify that subscriptions are still valid etc…



167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/myxi/session.rb', line 167

def touch
  subscriptions.each do |exchange_name, routing_keys|
    if exchange = Myxi::Exchange::EXCHANGES[exchange_name.to_sym]
      routing_keys.each do |routing_key|
        unless exchange.can_subscribe?(routing_key, self.auth_object)
          Myxi.logger.info "[#{id}] Session is not longer allowed to subscibe to #{exchange_name}/#{routing_key}"
          unsubscribe(exchange_name, routing_key, true)
        end
      end
    end
  end
end

#unsubscribe(exchange_name, routing_key, auto = false) ⇒ Object

Unsubscribe this session from the given exchange name and routing key



134
135
136
137
138
139
140
141
# File 'lib/myxi/session.rb', line 134

def unsubscribe(exchange_name, routing_key, auto = false)
  @queue.unbind(exchange_name.to_s, :routing_key => routing_key.to_s)
  if subscriptions[exchange_name.to_s]
    subscriptions[exchange_name.to_s].delete(routing_key.to_s)
  end
  Myxi.logger.debug "[#{id}] \e[42;37mUNSUBSCRIBED\e[0m \e[32m#{exchange_name} / #{routing_key}\e[0m"
  send('Unsubscribed', :exchange_name => exchange_name, :routing_key => routing_key, :auto => auto)
end

#unsubscribe_allObject

Unsubscribe all



157
158
159
160
161
# File 'lib/myxi/session.rb', line 157

def unsubscribe_all
  self.subscriptions.keys.each do |exchange_name|
    self.unsubscribe_all_for_exchange(exchange_name)
  end
end

#unsubscribe_all_for_exchange(exchange_name) ⇒ Object

Unscubribe all for an exchange



146
147
148
149
150
151
152
# File 'lib/myxi/session.rb', line 146

def unsubscribe_all_for_exchange(exchange_name)
  if array = self.subscriptions[exchange_name.to_s]
    array.dup.each do |routing_key|
      self.unsubscribe(exchange_name.to_s, routing_key)
    end
  end
end