Class: StompServer::Protocols::Stomp

Inherits:
EventMachine::Connection
  • Object
show all
Defined in:
lib/stomp_server_ng/protocols/stomp.rb

Overview

Stomp Protocol Handler.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Stomp

Protocol handler initialization



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 24

def initialize(*args)
  super(*args)
  #
  @@log = Logger.new(STDOUT)
  @@log.level = StompServer::LogHelper.get_loglevel()
  #
  @@options = (Hash === args.last) ? args.pop : {}
  # Arguments are passed from EventMachine::start_server
  @@auth_required = args[0]
  @@queue_manager = args[1]
  @@topic_manager = args[2]
  @@stompauth =     args[3]
  #
  # N.B.: The session ID is an instance variable!
  #
  if @@options[:session_cache] == 0
    lt = Time.now
    @session_id = "ssng_#{lt.to_f}"
  else
    @session_id = StompServer::SessionIDManager.get_cache_id(@@options[:session_cache])
  end
  @@log.debug("#{@session_id} #{self} Session ID assigned")
  #
  @@log.warn("#{@session_id} #{self} Protocol initialization complete")
end

Instance Attribute Details

#session_idObject (readonly)

Returns the value of attribute session_id.



21
22
23
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 21

def session_id
  @session_id
end

Instance Method Details

#abort(frame, trans = nil) ⇒ Object

Stomp Protocol - ABORT



184
185
186
187
188
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 184

def abort(frame, trans=nil)
  raise "#{@session_id} Missing transaction" unless trans
  raise "#{@session_id} transaction does not exist" unless @transactions.has_key?(trans)
  @transactions.delete(trans)
end

#ack(frame) ⇒ Object

Stomp Protocol - ACK

Delegated to the queue manager.



194
195
196
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 194

def ack(frame)
  @@queue_manager.ack(self, frame)
end

#begin(frame, trans = nil) ⇒ Object

Stomp Protocol - BEGIN



200
201
202
203
204
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 200

def begin(frame, trans=nil)
  raise "#{@session_id} Missing transaction" unless trans
  raise "#{@session_id} transaction exists" if @transactions.has_key?(trans)
  @transactions[trans] = []
end

#commit(frame, trans = nil) ⇒ Object

Stomp Protocol - COMMIT



208
209
210
211
212
213
214
215
216
217
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 208

def commit(frame, trans=nil)
  raise "#{@session_id} Missing transaction" unless trans
  raise "#{@session_id} transaction does not exist" unless @transactions.has_key?(trans)
  #    
  (@transactions[trans]).each do |frame|
    frame.headers.delete('transaction')
    process_frame(frame)
  end
  @transactions.delete(trans)
end

#connect(frame) ⇒ Object

Stomp Protocol - CONNECT



221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 221

def connect(frame)
  if @@auth_required
    unless frame.headers['login'] and frame.headers['passcode'] and  @@stompauth.authorized[frame.headers['login']] == frame.headers['passcode']
      raise "#{@session_id} {self} Invalid Login"
    end
  end
  @@log.warn "#{@session_id} Connecting"
  response = StompServer::StompFrame.new("CONNECTED", {'session' => @session_id})
  #
  stomp_send_data(response)
  @connected = true
end

#disconnect(frame) ⇒ Object

Stomp Protocol - DISCONNECT



236
237
238
239
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 236

def disconnect(frame)
  @@log.warn "#{@session_id} Polite disconnect"
  close_connection_after_writing
end

#handle_transaction(frame, trans, cmd) ⇒ Object

handle_transaction



360
361
362
363
364
365
366
367
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 360

def handle_transaction(frame, trans, cmd)
  if [:begin, :commit, :abort].include?(cmd)
    __send__(cmd, frame, trans) # Object#send alias call
  else
    raise "#{@session_id} transaction does not exist" unless @transactions.has_key?(trans)
    @transactions[trans] << frame
  end    
end

#post_initObject

EM::Connection.post_init()

Protocol handler post initialization.



100
101
102
103
104
105
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 100

def post_init
  @sfr = StompServer::StompFrameRecognizer.new
  @transactions = {}
  @connected = false
  @@log.debug("#{@session_id} protocol post_init complete")
end

#process_frame(frame) ⇒ Object

process_frame

Process and individual stomp frame.



339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 339

def process_frame(frame)
  cmd = frame.command.downcase.to_sym
  raise "#{@session_id} #{self} Unhandled frame: #{cmd}" unless VALID_COMMANDS.include?(cmd)
  raise "#{@session_id} #{self} Not connected" if !@connected && cmd != :connect
  @@log.debug("#{@session_id} process_frame: #{frame.command}")
  # Add session ID to the frame headers
  frame.headers['session'] = @session_id
  # Send receipt first if required
  send_receipt(frame.headers['receipt']) if frame.headers['receipt']
  #
  if trans = frame.headers['transaction']
    # Handle transactional frame if required.
    handle_transaction(frame, trans, cmd)
  else
    # Otherwise, just route the non-transactional frame.
    __send__(cmd, frame) # Object#send alias call
  end
end

#process_framesObject

process_frames

Handle all stomp frames currently in the recognizer’s accumulated array of frames.



330
331
332
333
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 330

def process_frames
  frame = nil
  process_frame(frame) while frame = @sfr.frames.shift
end

#receive_data(data) ⇒ Object

EM::Connection.receive_data(data)

Delegate to stomp_receive_data helper.



121
122
123
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 121

def receive_data(data)
  stomp_receive_data(data)
end

#send(frame) ⇒ Object

Stomp Protocol - SEND

The stomp SEND verb is by routing through:

  • receive_data(data)

  • stomp_receive_data

  • process_frames

  • process_frame

  • use Object#__send__ to call this method



251
252
253
254
255
256
257
258
259
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 251

def send(frame)
  # set message id
  if frame.dest.match(%r|^/queue|)
    @@queue_manager.sendmsg(frame)
  else
    frame.headers['message-id'] = "msg-#stompcma-#{@@topic_manager.next_index}"
    @@topic_manager.sendmsg(frame)
  end
end

#send_data(data) ⇒ Object

EM::Connection.send_data(data)

Just calls super.



138
139
140
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 138

def send_data(data)
  super(data)
end

#send_error(msg) ⇒ Object

send_error

Send a single error frame.



373
374
375
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 373

def send_error(msg)
  send_frame("ERROR",{'message' => 'See below'},msg)
end

#send_frame(command, headers = {}, body = '') ⇒ Object

send_frame

Send an individual stomp frame.



381
382
383
384
385
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 381

def send_frame(command, headers={}, body='')
  headers['content-length'] = body.size.to_s
  response = StompServer::StompFrame.new(command, headers, body)
  stomp_send_data(response)
end

#send_receipt(id) ⇒ Object

send_receipt

Send a single receipt frame.



391
392
393
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 391

def send_receipt(id)
  send_frame("RECEIPT", { 'receipt-id' => id})
end

#stomp_receive_data(data) ⇒ Object

:startdoc:

stomp_receive_data

Called from EM::Connection.receive_data(data). This is where we begin processing a set of data fresh off the wire.



308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 308

def stomp_receive_data(data)
  begin
    # Limit log message length.
    logdata = data
    logdata = data[0..256] + "...truncated..." if data.length > 256
    @@log.debug "#{@session_id} receive_data: #{logdata.inspect}"
    # Append all data to the recognizer buffer.
    @sfr << data
    # Process any stomp frames in this set of data.
    process_frames
  rescue Exception => e
    @@log.error "#{@session_id} err: #{e} #{e.backtrace.join("\n")}"
    send_error(e.to_s)
    close_connection_after_writing
  end
end

#stomp_send_data(frame) ⇒ Object

stomp_send_data



397
398
399
400
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 397

def stomp_send_data(frame)
  @@log.debug "#{@session_id} Sending frame #{frame.to_s}"
  send_data(frame.to_s)
end

#subscribe(frame) ⇒ Object

Stomp Protocol - SUBSCRIBE

Delegated to the queue or topic manager.



266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 266

def subscribe(frame)
  use_ack = false
  use_ack = true  if frame.headers['ack'] == 'client'
  #
  if frame.headers['id']
    subid = frame.headers['id']
  elsif frame.headers[:id]
    subid = frame.headers[:id]
  else
    subid = nil
  end
  #
  if frame.dest =~ %r|^/queue|
    @@queue_manager.subscribe(frame.dest, self, use_ack, subid)
  else
    @@topic_manager.subscribe(frame.dest, self)
  end
end

#unbindObject

EM::Connection.unbind()

Unbind the connection.



169
170
171
172
173
174
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 169

def unbind()
  @@log.warn "#{@session_id} Unbind called"
  @connected = false
  @@queue_manager.disconnect(self)
  @@topic_manager.disconnect(self)
end

#unsubscribe(frame) ⇒ Object

Stomp Protocol - UNSUBSCRIBE

Delegated to the queue or topic manager.



289
290
291
292
293
294
295
# File 'lib/stomp_server_ng/protocols/stomp.rb', line 289

def unsubscribe(frame)
  if frame.dest =~ %r|^/queue|
    @@queue_manager.unsubscribe(frame.dest,self)
  else
    @@topic_manager.unsubscribe(frame.dest,self)
  end
end