Class: StompServer::Protocols::Stomp
- Inherits:
-
EventMachine::Connection
- Object
- EventMachine::Connection
- StompServer::Protocols::Stomp
- Defined in:
- lib/stomp_server/protocols/stomp.rb
Instance Method Summary collapse
- #abort(frame, trans = nil) ⇒ Object
- #ack(frame) ⇒ Object
- #begin(frame, trans = nil) ⇒ Object
- #commit(frame, trans = nil) ⇒ Object
- #connect(frame) ⇒ Object
- #connected? ⇒ Boolean
- #disconnect(frame) ⇒ Object
- #handle_transaction(frame, trans, cmd) ⇒ Object
-
#initialize(*args) ⇒ Stomp
constructor
A new instance of Stomp.
- #post_init ⇒ Object
- #process_frame(frame) ⇒ Object
- #process_frames ⇒ Object
- #receive_data(data) ⇒ Object
- #send_error(msg) ⇒ Object
- #send_frame(command, headers = {}, body = '') ⇒ Object
- #send_message(msg) ⇒ Object
- #send_receipt(id) ⇒ Object
- #sendmsg(frame) ⇒ Object
- #stomp_receive_data(data) ⇒ Object
- #stomp_receive_frame(frame) ⇒ Object
- #stomp_send_data(frame) ⇒ Object
- #subscribe(frame) ⇒ Object
- #unbind ⇒ Object
- #unsubscribe(frame) ⇒ Object
Constructor Details
#initialize(*args) ⇒ Stomp
Returns a new instance of Stomp.
8 9 10 |
# File 'lib/stomp_server/protocols/stomp.rb', line 8 def initialize *args super end |
Instance Method Details
#abort(frame, trans = nil) ⇒ Object
134 135 136 137 138 |
# File 'lib/stomp_server/protocols/stomp.rb', line 134 def abort(frame, trans=nil) raise "Missing transaction" unless trans raise "transaction does not exist" unless @transactions.has_key?(trans) @transactions.delete(trans) end |
#ack(frame) ⇒ Object
140 141 142 |
# File 'lib/stomp_server/protocols/stomp.rb', line 140 def ack(frame) @@queue_manager.ack(self, frame) end |
#begin(frame, trans = nil) ⇒ Object
117 118 119 120 121 |
# File 'lib/stomp_server/protocols/stomp.rb', line 117 def begin(frame, trans=nil) raise "Missing transaction" unless trans raise "transaction exists" if @transactions.has_key?(trans) @transactions[trans] = [] end |
#commit(frame, trans = nil) ⇒ Object
123 124 125 126 127 128 129 130 131 132 |
# File 'lib/stomp_server/protocols/stomp.rb', line 123 def commit(frame, trans=nil) raise "Missing transaction" unless trans raise "transaction does not exist" unless @transactions.has_key?(trans) (@transactions[trans]).each do |frame| frame.headers.delete('transaction') process_frame(frame) end @transactions.delete(trans) end |
#connect(frame) ⇒ Object
77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/stomp_server/protocols/stomp.rb', line 77 def connect(frame) if @@auth_required unless frame.headers['login'] and frame.headers['passcode'] and @@stompauth.[frame.headers['login']] == frame.headers['passcode'] raise "Invalid Login" end end puts "Connecting" if $DEBUG response = StompServer::StompFrame.new("CONNECTED", {'session' => 'wow'}) stomp_send_data(response) @connected = true end |
#connected? ⇒ Boolean
156 157 158 |
# File 'lib/stomp_server/protocols/stomp.rb', line 156 def connected? @connected end |
#disconnect(frame) ⇒ Object
144 145 146 147 |
# File 'lib/stomp_server/protocols/stomp.rb', line 144 def disconnect(frame) puts "Polite disconnect" if $DEBUG close_connection_after_writing end |
#handle_transaction(frame, trans, cmd) ⇒ Object
68 69 70 71 72 73 74 75 |
# File 'lib/stomp_server/protocols/stomp.rb', line 68 def handle_transaction(frame, trans, cmd) if [:begin, :commit, :abort].include?(cmd) send(cmd, frame, trans) else raise "transaction does not exist" unless @transactions.has_key?(trans) @transactions[trans] << frame end end |
#post_init ⇒ Object
12 13 14 15 16 |
# File 'lib/stomp_server/protocols/stomp.rb', line 12 def post_init @sfr = StompServer::StompFrameRecognizer.new @transactions = {} @connected = false end |
#process_frame(frame) ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/stomp_server/protocols/stomp.rb', line 50 def process_frame(frame) cmd = frame.command.downcase.to_sym raise "Unhandled frame: #{cmd}" unless VALID_COMMANDS.include?(cmd) raise "Not connected" if !@connected && cmd != :connect # I really like this code, but my needs are a little trickier # if trans = frame.headers['transaction'] handle_transaction(frame, trans, cmd) else cmd = :sendmsg if cmd == :send send(cmd, frame) end send_receipt(frame.headers['receipt']) if frame.headers['receipt'] end |
#process_frames ⇒ Object
45 46 47 48 |
# File 'lib/stomp_server/protocols/stomp.rb', line 45 def process_frames frame = nil process_frame(frame) while frame = @sfr.frames.shift end |
#receive_data(data) ⇒ Object
18 19 20 |
# File 'lib/stomp_server/protocols/stomp.rb', line 18 def receive_data(data) stomp_receive_data(data) end |
#send_error(msg) ⇒ Object
169 170 171 |
# File 'lib/stomp_server/protocols/stomp.rb', line 169 def send_error(msg) send_frame("ERROR",{'message' => 'See below'},msg) end |
#send_frame(command, headers = {}, body = '') ⇒ Object
178 179 180 181 182 |
# File 'lib/stomp_server/protocols/stomp.rb', line 178 def send_frame(command, headers={}, body='') headers['content-length'] = body.size.to_s response = StompServer::StompFrame.new(command, headers, body) stomp_send_data(response) end |
#send_message(msg) ⇒ Object
160 161 162 163 |
# File 'lib/stomp_server/protocols/stomp.rb', line 160 def (msg) msg.command = "MESSAGE" stomp_send_data(msg) end |
#send_receipt(id) ⇒ Object
165 166 167 |
# File 'lib/stomp_server/protocols/stomp.rb', line 165 def send_receipt(id) send_frame("RECEIPT", { 'receipt-id' => id}) end |
#sendmsg(frame) ⇒ Object
89 90 91 92 93 94 95 96 97 |
# File 'lib/stomp_server/protocols/stomp.rb', line 89 def sendmsg(frame) # set message id if frame.dest.match(%r|^/queue|) @@queue_manager.sendmsg(frame) else frame.headers['message-id'] = "msg-#stompcma-#{@@topic_manager.next_index}" @@topic_manager.sendmsg(frame) end end |
#stomp_receive_data(data) ⇒ Object
22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/stomp_server/protocols/stomp.rb', line 22 def stomp_receive_data(data) begin puts "receive_data: #{data.inspect}" if $DEBUG @sfr << data process_frames rescue Exception => e puts "err: #{e} #{e.backtrace.join("\n")}" send_error(e.to_s) close_connection_after_writing end end |
#stomp_receive_frame(frame) ⇒ Object
34 35 36 37 38 39 40 41 42 43 |
# File 'lib/stomp_server/protocols/stomp.rb', line 34 def stomp_receive_frame(frame) begin puts "receive_frame: #{frame.inspect}" if $DEBUG process_frame(frame) rescue Exception => e puts "err: #{e} #{e.backtrace.join("\n")}" send_error(e.to_s) close_connection_after_writing end end |
#stomp_send_data(frame) ⇒ Object
173 174 175 176 |
# File 'lib/stomp_server/protocols/stomp.rb', line 173 def stomp_send_data(frame) send_data(frame.to_s) puts "Sending frame #{frame.to_s}" if $DEBUG end |
#subscribe(frame) ⇒ Object
99 100 101 102 103 104 105 106 107 |
# File 'lib/stomp_server/protocols/stomp.rb', line 99 def subscribe(frame) use_ack = false use_ack = true if frame.headers['ack'] == 'client' if frame.dest =~ %r|^/queue| @@queue_manager.subscribe(frame.dest, self,use_ack) else @@topic_manager.subscribe(frame.dest, self) end end |
#unbind ⇒ Object
149 150 151 152 153 154 |
# File 'lib/stomp_server/protocols/stomp.rb', line 149 def unbind p "Unbind called" if $DEBUG @connected = false @@queue_manager.disconnect(self) @@topic_manager.disconnect(self) end |
#unsubscribe(frame) ⇒ Object
109 110 111 112 113 114 115 |
# File 'lib/stomp_server/protocols/stomp.rb', line 109 def unsubscribe(frame) if frame.dest =~ %r|^/queue| @@queue_manager.unsubscribe(frame.dest,self) else @@topic_manager.unsubscribe(frame.dest,self) end end |