Class: Juggernaut::Server

Inherits:
EventMachine::Connection
  • Object
show all
Includes:
Miscel
Defined in:
lib/juggernaut/server.rb

Defined Under Namespace

Classes: CorruptJSON, InvalidCommand, InvalidRequest, MalformedBroadcast, MalformedQuery, MalformedSubscribe, UnauthorisedBroadcast, UnauthorisedQuery, UnauthorisedSubscription

Constant Summary collapse

POLICY_FILE =
<<-EOF
  <cross-domain-policy>
    <allow-access-from domain="*" to-ports="PORT" />
  </cross-domain-policy>
EOF
POLICY_REQUEST =
"<policy-file-request/>"
CR =
"\0"

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Miscel

#config_path, #log_path, #logger, #options, #options=, #pid_path

Instance Attribute Details

#channelsObject (readonly)

Returns the value of attribute channels.



54
55
56
# File 'lib/juggernaut/server.rb', line 54

def channels
  @channels
end

#clientObject (readonly)

Returns the value of attribute client.



55
56
57
# File 'lib/juggernaut/server.rb', line 55

def client
  @client
end

#connectedObject (readonly)

Returns the value of attribute connected.



51
52
53
# File 'lib/juggernaut/server.rb', line 51

def connected
  @connected
end

#current_msg_idObject (readonly)

Returns the value of attribute current_msg_id.



49
50
51
# File 'lib/juggernaut/server.rb', line 49

def current_msg_id
  @current_msg_id
end

#logout_timeoutObject (readonly)

Returns the value of attribute logout_timeout.



52
53
54
# File 'lib/juggernaut/server.rb', line 52

def logout_timeout
  @logout_timeout
end

#messagesObject (readonly)

Returns the value of attribute messages.



50
51
52
# File 'lib/juggernaut/server.rb', line 50

def messages
  @messages
end

#statusObject (readonly)

Returns the value of attribute status.



53
54
55
# File 'lib/juggernaut/server.rb', line 53

def status
  @status
end

Instance Method Details

#add_channel(chan_name) ⇒ Object



189
190
191
192
# File 'lib/juggernaut/server.rb', line 189

def add_channel(chan_name)
  return if !chan_name or chan_name == ''
  @channels << chan_name unless has_channel?(chan_name)
end

#add_channels(chan_names) ⇒ Object



194
195
196
197
198
# File 'lib/juggernaut/server.rb', line 194

def add_channels(chan_names)
  chan_names.to_a.each do |chan_name|
    add_channel(chan_name)
  end
end

#alive?Boolean

Returns:

  • (Boolean)


174
175
176
# File 'lib/juggernaut/server.rb', line 174

def alive?
  @connected == true
end

#broadcast(bdy) ⇒ Object

Connection methods



161
162
163
164
# File 'lib/juggernaut/server.rb', line 161

def broadcast(bdy)
  msg = Juggernaut::Message.new(@current_msg_id += 1, bdy, self.signature)
  publish(msg)
end

#has_channel?(channel) ⇒ Boolean

Returns:

  • (Boolean)


185
186
187
# File 'lib/juggernaut/server.rb', line 185

def has_channel?(channel)
  @channels.include?(channel)
end

#has_channels?(channels) ⇒ Boolean

Returns:

  • (Boolean)


178
179
180
181
182
183
# File 'lib/juggernaut/server.rb', line 178

def has_channels?(channels)
  channels.each {|channel|
    return true if has_channel?(channel)
  }
  false
end

#mark_dead(reason = "Unknown error") ⇒ Object



166
167
168
169
170
171
172
# File 'lib/juggernaut/server.rb', line 166

def mark_dead(reason = "Unknown error")
  # Once dead, a client never recovers since a reconnection
  # attempt would hook onto a new em instance. A client
  # usually dies through an unbind 
  @connected = false
  @client.remove_connection(self) if @client
end

#post_initObject

EM methods



59
60
61
62
63
64
65
66
67
# File 'lib/juggernaut/server.rb', line 59

def post_init
  logger.debug "New client [#{client_ip}]"
  @client         = nil
  @channels       = []
  @current_msg_id = 0
  @connected      = true
  @logout_timeout = nil
  @buffer = ''
end

#process_message(ln) ⇒ Object



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/juggernaut/server.rb', line 92

def process_message(ln)
  logger.debug "Processing message: #{ln}"
  @request        = nil
  
  if ln == POLICY_REQUEST
    logger.debug "Sending crossdomain file"
    send_data POLICY_FILE.gsub('PORT', (options[:public_port]||options[:port]).to_s)
    close_connection_after_writing
    return
  end
  
  begin
    @request = JSON.parse(ln) unless ln.empty?
  rescue
    raise CorruptJSON, ln
  end
  
  raise InvalidRequest, ln if !@request
  
  @request.symbolize_keys!
  
  # For debugging
  @request[:ip] = client_ip
  
  @request[:channels] = (@request[:channels] || []).compact.select {|c| !!c && c != '' }.uniq
  
  if @request[:client_ids] 
    @request[:client_ids] = @request[:client_ids].to_a.compact.select {|c| !!c && c != '' }.uniq
  end
  
  case @request[:command].to_sym
    when :broadcast
      broadcast_command
    when :subscribe
      subscribe_command
    when :query
      query_command
    when :noop
      noop_command
  else
    raise InvalidCommand, @request
  end

rescue JuggernautError => e
  logger.error("#{e} - #{e.message.inspect}")
  close_connection
# So as to stop em quitting
rescue => e
  logger ? logger.error(e) : puts(e)
end

#process_whole_messages(data) ⇒ Object

 process any whole messages in the buffer, and return the new contents of the buffer



79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/juggernaut/server.rb', line 79

def process_whole_messages(data)
  return data if data !~ /\0/ # only process if data contains a \0 char
  messages = data.split("\0")
  if data =~ /\0$/
    data = ''
  else
    # remove the last message from the list (because it is incomplete) before processing
    data = messages.pop
  end
  messages.each {|message| process_message(message.strip)}
  return data
end

#publish(msg) ⇒ Object

As far as I’m aware, send_data never throws an exception



154
155
156
157
# File 'lib/juggernaut/server.rb', line 154

def publish(msg)
  logger.debug "Sending msg: #{msg.to_s} to client #{@request[:client_id]} (session #{@request[:session_id]})"
  send_data(msg.to_s + CR)
end

#receive_data(data) ⇒ Object

 Juggernaut packets are terminated with “0” so we need to buffer the data until we find the terminating “0”



72
73
74
75
# File 'lib/juggernaut/server.rb', line 72

def receive_data(data)
  @buffer << data
  @buffer = process_whole_messages(@buffer)
end

#remove_channel!(chan_name) ⇒ Object



200
201
202
# File 'lib/juggernaut/server.rb', line 200

def remove_channel!(chan_name)
  @channels.delete(chan_name)
end

#remove_channels!(chan_names) ⇒ Object



204
205
206
207
208
# File 'lib/juggernaut/server.rb', line 204

def remove_channels!(chan_names)
  chan_names.to_a.each do |chan_name|
    remove_channel!(chan_name)
  end
end

#unbindObject



143
144
145
146
147
148
149
150
# File 'lib/juggernaut/server.rb', line 143

def unbind
  if @client
    # todo - should be called after timeout?
    @client.logout_connection_request(@channels)
    logger.debug "Lost client #{@client.friendly_id}"
  end
  mark_dead('Unbind called')
end