Module: StompServer
- Defined in:
- lib/stomp_server.rb
Constant Summary collapse
- VERSION =
'0.9.6'- VALID_COMMANDS =
[:connect, :send, :subscribe, :unsubscribe, :begin, :commit, :abort, :ack, :disconnect]
Class Method Summary collapse
Instance Method Summary collapse
- #abort(frame, trans = nil) ⇒ Object
- #ack(frame) ⇒ Object
- #begin(frame, trans = nil) ⇒ Object
- #commit(frame, trans = nil) ⇒ Object
- #connect(frame) ⇒ Object
- #disconnect(frame) ⇒ Object
- #handle_transaction(frame, trans, cmd) ⇒ Object
- #post_init ⇒ Object
- #process_frame(frame) ⇒ Object
- #process_frames ⇒ Object
- #receive_data(data) ⇒ Object
- #send_error(msg) ⇒ Object
- #send_frame(command, headers = {}, body = '') ⇒ Object
- #send_message(msg) ⇒ Object
- #send_receipt(id) ⇒ Object
- #sendmsg(frame) ⇒ Object
-
#shutdown(msg) ⇒ Object
here is how we can stop, of course there is currently no way to get here.
- #subscribe(frame) ⇒ Object
- #unbind ⇒ Object
- #unsubscribe(frame) ⇒ Object
Class Method Details
.setup(j = FrameJournal.new, tm = TopicManager.new, qm = QueueManager.new(j)) ⇒ Object
12 13 14 15 16 |
# File 'lib/stomp_server.rb', line 12 def self.setup(j = FrameJournal.new, tm = TopicManager.new, qm = QueueManager.new(j)) @@journal = j @@topic_manager = tm @@queue_manager = qm end |
Instance Method Details
#abort(frame, trans = nil) ⇒ Object
128 129 130 131 132 |
# File 'lib/stomp_server.rb', line 128 def abort(frame, trans=nil) raise "Missing transaction" unless trans raise "transaction does not exist" unless @transactions.has_key?(trans) @transactions.delete(trans) end |
#ack(frame) ⇒ Object
134 135 136 |
# File 'lib/stomp_server.rb', line 134 def ack(frame) @@queue_manager.ack(self, frame) end |
#begin(frame, trans = nil) ⇒ Object
111 112 113 114 115 |
# File 'lib/stomp_server.rb', line 111 def begin(frame, trans=nil) raise "Missing transaction" unless trans raise "transaction exists" if @transactions.has_key?(trans) @transactions[trans] = [] end |
#commit(frame, trans = nil) ⇒ Object
117 118 119 120 121 122 123 124 125 126 |
# File 'lib/stomp_server.rb', line 117 def commit(frame, trans=nil) raise "Missing transaction" unless trans raise "transaction does not exist" unless @transactions.has_key?(trans) (@transactions[trans]).each do |frame| frame.headers.delete('transaction') process_frame(frame) end @transactions.delete(trans) end |
#connect(frame) ⇒ Object
77 78 79 80 81 82 |
# File 'lib/stomp_server.rb', line 77 def connect(frame) puts "Connecting" if $DEBUG response = StompFrame.new("CONNECTED", {'session' => 'wow'}) send_data(response.to_s) @connected = true end |
#disconnect(frame) ⇒ Object
138 139 140 141 |
# File 'lib/stomp_server.rb', line 138 def disconnect(frame) puts "Polite disconnect" if $DEBUG close_connection_after_writing end |
#handle_transaction(frame, trans, cmd) ⇒ Object
68 69 70 71 72 73 74 75 |
# File 'lib/stomp_server.rb', line 68 def handle_transaction(frame, trans, cmd) if [:begin, :commit, :abort].include?(cmd) send(cmd, frame, trans) else raise "transaction does not exist" unless @transactions.has_key?(trans) @transactions[trans] << frame end end |
#post_init ⇒ Object
18 19 20 21 22 |
# File 'lib/stomp_server.rb', line 18 def post_init @sfr = StompFrameRecognizer.new @transactions = {} @connected = false end |
#process_frame(frame) ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/stomp_server.rb', line 41 def process_frame(frame) cmd = frame.command.downcase.to_sym raise "Unhandled frame: #{cmd}" unless VALID_COMMANDS.include?(cmd) raise "Not connected" if !@connected && cmd != :connect puts "process_frame #{cmd}" if $DEBUG # I really like this code, but my needs are a little trickier # if trans = frame.headers['transaction'] handle_transaction(frame, trans, cmd) else cmd = :sendmsg if cmd == :send puts "Execute #{cmd}" if $DEBUG send(cmd, frame) end send_receipt(frame.headers['receipt']) if frame.headers['receipt'] end |
#process_frames ⇒ Object
36 37 38 39 |
# File 'lib/stomp_server.rb', line 36 def process_frames frame = nil process_frame(frame) while frame = @sfr.frames.shift end |
#receive_data(data) ⇒ Object
24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/stomp_server.rb', line 24 def receive_data(data) begin puts "receive_data: #{data.inspect}" if $DEBUG @sfr << data process_frames rescue Exception => e puts "err: #{e} #{e.backtrace.join("\n")}" if $DEBUG send_error(e.to_s) close_connection_after_writing end end |
#send_error(msg) ⇒ Object
157 158 159 |
# File 'lib/stomp_server.rb', line 157 def send_error(msg) send_frame("ERROR",{},msg) end |
#send_frame(command, headers = {}, body = '') ⇒ Object
161 162 163 164 |
# File 'lib/stomp_server.rb', line 161 def send_frame(command, headers={}, body='') response = StompFrame.new(command, headers, body) send_data(response.to_s) end |
#send_message(msg) ⇒ Object
148 149 150 151 |
# File 'lib/stomp_server.rb', line 148 def (msg) msg.command = "MESSAGE" send_data(msg.to_s) end |
#send_receipt(id) ⇒ Object
153 154 155 |
# File 'lib/stomp_server.rb', line 153 def send_receipt(id) send_frame("RECEIPT", { 'receipt-id' => id}) end |
#sendmsg(frame) ⇒ Object
84 85 86 87 88 89 90 91 92 |
# File 'lib/stomp_server.rb', line 84 def sendmsg(frame) # set message id frame.headers['message-id'] = "msg-#{@@journal.system_id}-#{@@journal.next_index}" if frame.dest.match(%r|^/queue|) @@queue_manager.sendmsg(frame) else @@topic_manager.sendmsg(frame) end end |
#shutdown(msg) ⇒ Object
here is how we can stop, of course there is currently no way to get here
64 65 66 |
# File 'lib/stomp_server.rb', line 64 def shutdown(msg) EventMachine::stop_event_loop end |
#subscribe(frame) ⇒ Object
94 95 96 97 98 99 100 101 |
# File 'lib/stomp_server.rb', line 94 def subscribe(frame) if frame.dest =~ %r|^/queue| ack = frame.headers['ack'] == 'client' @@queue_manager.subscribe(frame.dest, self, ack) else @@topic_manager.subscribe(frame.dest, self) end end |
#unbind ⇒ Object
143 144 145 146 |
# File 'lib/stomp_server.rb', line 143 def unbind @@queue_manager.disconnect(self) @@topic_manager.disconnect(self) end |
#unsubscribe(frame) ⇒ Object
103 104 105 106 107 108 109 |
# File 'lib/stomp_server.rb', line 103 def unsubscribe(frame) if frame.dest =~ %r|^/queue| @@queue_manager.unsubscribe(frame.dest, self) else @@topic_manager.unsubscribe(frame.dest, self) end end |