Module: StompServer
- Defined in:
- lib/stomp_server.rb
Constant Summary collapse
- VERSION =
'0.9.2'- VALID_COMMANDS =
[:connect, :send, :subscribe, :unsubscribe, :begin, :commit, :abort, :ack, :disconnect]
Class Method Summary collapse
Instance Method Summary collapse
- #abort(frame, trans = nil) ⇒ Object
- #ack(frame) ⇒ Object
- #begin(frame, trans = nil) ⇒ Object
- #commit(frame, trans = nil) ⇒ Object
- #connect(frame) ⇒ Object
- #disconnect(frame) ⇒ Object
- #handle_transaction(frame, trans, cmd) ⇒ Object
- #post_init ⇒ Object
- #process_frame(frame) ⇒ Object
- #process_frames ⇒ Object
- #receive_data(data) ⇒ Object
- #send_error(msg) ⇒ Object
- #send_frame(command, headers = {}, body = '') ⇒ Object
- #send_message(msg) ⇒ Object
- #send_receipt(id) ⇒ Object
- #sendmsg(frame) ⇒ Object
- #subscribe(frame) ⇒ Object
- #unbind ⇒ Object
- #unsubscribe(frame) ⇒ Object
Class Method Details
.setup(j = FrameJournal.new, tm = TopicManager.new, qm = QueueManager.new(j)) ⇒ Object
11 12 13 14 15 |
# File 'lib/stomp_server.rb', line 11 def self.setup(j = FrameJournal.new, tm = TopicManager.new, qm = QueueManager.new(j)) @@journal = j @@topic_manager = tm @@queue_manager = qm end |
Instance Method Details
#abort(frame, trans = nil) ⇒ Object
117 118 119 120 121 |
# File 'lib/stomp_server.rb', line 117 def abort(frame, trans=nil) raise "Missing transaction" unless trans raise "transaction does not exist" unless @transactions.has_key?(trans) @transactions.delete(trans) end |
#ack(frame) ⇒ Object
123 124 125 |
# File 'lib/stomp_server.rb', line 123 def ack(frame) @@queue_manager.ack(self, frame) end |
#begin(frame, trans = nil) ⇒ Object
100 101 102 103 104 |
# File 'lib/stomp_server.rb', line 100 def begin(frame, trans=nil) raise "Missing transaction" unless trans raise "transaction exists" if @transactions.has_key?(trans) @transactions[trans] = [] end |
#commit(frame, trans = nil) ⇒ Object
106 107 108 109 110 111 112 113 114 115 |
# File 'lib/stomp_server.rb', line 106 def commit(frame, trans=nil) raise "Missing transaction" unless trans raise "transaction does not exist" unless @transactions.has_key?(trans) (@transactions[trans]).each do |frame| frame.headers.delete('transaction') process_frame(frame) end @transactions.delete(trans) end |
#connect(frame) ⇒ Object
67 68 69 70 71 72 |
# File 'lib/stomp_server.rb', line 67 def connect(frame) puts "Connecting" if $DEBUG response = StompFrame.new("CONNECTED", {'session' => 'wow'}) send_data(response.to_s) @connected = true end |
#disconnect(frame) ⇒ Object
127 128 129 130 |
# File 'lib/stomp_server.rb', line 127 def disconnect(frame) puts "Polite disconnect" if $DEBUG close_connection_after_writing end |
#handle_transaction(frame, trans, cmd) ⇒ Object
58 59 60 61 62 63 64 65 |
# File 'lib/stomp_server.rb', line 58 def handle_transaction(frame, trans, cmd) if [:begin, :commit, :abort].include?(cmd) send(cmd, frame, trans) else raise "transaction does not exist" unless @transactions.has_key?(trans) @transactions[trans] << frame end end |
#post_init ⇒ Object
17 18 19 20 21 |
# File 'lib/stomp_server.rb', line 17 def post_init @sfr = StompFrameRecognizer.new @transactions = {} @connected = false end |
#process_frame(frame) ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/stomp_server.rb', line 40 def process_frame(frame) cmd = frame.command.downcase.to_sym raise "Unhandled frame: #{cmd}" unless VALID_COMMANDS.include?(cmd) raise "Not connected" if !@connected && cmd != :connect # I really like this code, but my needs are a little trickier # if trans = frame.headers['transaction'] handle_transaction(frame, trans, cmd) else cmd = :sendmsg if cmd == :send send(cmd, frame) end send_receipt(frame.headers['receipt']) if frame.headers['receipt'] end |
#process_frames ⇒ Object
35 36 37 38 |
# File 'lib/stomp_server.rb', line 35 def process_frames frame = nil process_frame(frame) while frame = @sfr.frames.shift end |
#receive_data(data) ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/stomp_server.rb', line 23 def receive_data(data) begin puts "receive_data: #{data.inspect}" if $DEBUG @sfr << data process_frames rescue Exception => e puts "err: #{e} #{e.backtrace.join("\n")}" if $DEBUG send_error(e.to_s) close_connection_after_writing end end |
#send_error(msg) ⇒ Object
146 147 148 |
# File 'lib/stomp_server.rb', line 146 def send_error(msg) send_frame("ERROR",{},msg) end |
#send_frame(command, headers = {}, body = '') ⇒ Object
150 151 152 153 |
# File 'lib/stomp_server.rb', line 150 def send_frame(command, headers={}, body='') response = StompFrame.new(command, headers, body) send_data(response.to_s) end |
#send_message(msg) ⇒ Object
137 138 139 140 |
# File 'lib/stomp_server.rb', line 137 def (msg) msg.command = "MESSAGE" send_data(msg.to_s) end |
#send_receipt(id) ⇒ Object
142 143 144 |
# File 'lib/stomp_server.rb', line 142 def send_receipt(id) send_frame("RECEIPT", { 'receipt-id' => id}) end |
#sendmsg(frame) ⇒ Object
74 75 76 77 78 79 80 81 82 |
# File 'lib/stomp_server.rb', line 74 def sendmsg(frame) # set message id frame.headers['message-id'] = "msg-#{@@journal.system_id}-#{@@journal.next_index}" if frame.dest.match(%r|^/queue|) @@queue_manager.sendmsg(frame) else @@topic_manager.sendmsg(frame) end end |
#subscribe(frame) ⇒ Object
84 85 86 87 88 89 90 |
# File 'lib/stomp_server.rb', line 84 def subscribe(frame) if frame.dest =~ %r|^/queue| @@queue_manager.subscribe(frame.dest, self) else @@topic_manager.subscribe(frame.dest, self) end end |
#unbind ⇒ Object
132 133 134 135 |
# File 'lib/stomp_server.rb', line 132 def unbind @@queue_manager.disconnect(self) @@topic_manager.disconnect(self) end |
#unsubscribe(frame) ⇒ Object
92 93 94 95 96 97 98 |
# File 'lib/stomp_server.rb', line 92 def unsubscribe(frame) if frame.dest =~ %r|^/queue| @@queue_manager.unsubscribe(self) else @@topic_manager.unsubscribe(self) end end |