Module: StompServer
- Defined in:
- lib/stomp_server.rb
Constant Summary collapse
- VERSION =
'0.9.5'
- VALID_COMMANDS =
[:connect, :send, :subscribe, :unsubscribe, :begin, :commit, :abort, :ack, :disconnect]
Class Method Summary collapse
Instance Method Summary collapse
- #abort(frame, trans = nil) ⇒ Object
- #ack(frame) ⇒ Object
- #begin(frame, trans = nil) ⇒ Object
- #commit(frame, trans = nil) ⇒ Object
- #connect(frame) ⇒ Object
- #disconnect(frame) ⇒ Object
- #handle_transaction(frame, trans, cmd) ⇒ Object
- #post_init ⇒ Object
- #process_frame(frame) ⇒ Object
- #process_frames ⇒ Object
- #receive_data(data) ⇒ Object
- #send_error(msg) ⇒ Object
- #send_frame(command, headers = {}, body = '') ⇒ Object
- #send_message(msg) ⇒ Object
- #send_receipt(id) ⇒ Object
- #sendmsg(frame) ⇒ Object
- #subscribe(frame) ⇒ Object
- #unbind ⇒ Object
- #unsubscribe(frame) ⇒ Object
Class Method Details
.setup(j = FrameJournal.new, tm = TopicManager.new, qm = QueueManager.new(j)) ⇒ Object
12 13 14 15 16 |
# File 'lib/stomp_server.rb', line 12 def self.setup(j = FrameJournal.new, tm = TopicManager.new, qm = QueueManager.new(j)) @@journal = j @@topic_manager = tm @@queue_manager = qm end |
Instance Method Details
#abort(frame, trans = nil) ⇒ Object
118 119 120 121 122 |
# File 'lib/stomp_server.rb', line 118 def abort(frame, trans=nil) raise "Missing transaction" unless trans raise "transaction does not exist" unless @transactions.has_key?(trans) @transactions.delete(trans) end |
#ack(frame) ⇒ Object
124 125 126 |
# File 'lib/stomp_server.rb', line 124 def ack(frame) @@queue_manager.ack(self, frame) end |
#begin(frame, trans = nil) ⇒ Object
101 102 103 104 105 |
# File 'lib/stomp_server.rb', line 101 def begin(frame, trans=nil) raise "Missing transaction" unless trans raise "transaction exists" if @transactions.has_key?(trans) @transactions[trans] = [] end |
#commit(frame, trans = nil) ⇒ Object
107 108 109 110 111 112 113 114 115 116 |
# File 'lib/stomp_server.rb', line 107 def commit(frame, trans=nil) raise "Missing transaction" unless trans raise "transaction does not exist" unless @transactions.has_key?(trans) (@transactions[trans]).each do |frame| frame.headers.delete('transaction') process_frame(frame) end @transactions.delete(trans) end |
#connect(frame) ⇒ Object
68 69 70 71 72 73 |
# File 'lib/stomp_server.rb', line 68 def connect(frame) puts "Connecting" if $DEBUG response = StompFrame.new("CONNECTED", {'session' => 'wow'}) send_data(response.to_s) @connected = true end |
#disconnect(frame) ⇒ Object
128 129 130 131 |
# File 'lib/stomp_server.rb', line 128 def disconnect(frame) puts "Polite disconnect" if $DEBUG close_connection_after_writing end |
#handle_transaction(frame, trans, cmd) ⇒ Object
59 60 61 62 63 64 65 66 |
# File 'lib/stomp_server.rb', line 59 def handle_transaction(frame, trans, cmd) if [:begin, :commit, :abort].include?(cmd) send(cmd, frame, trans) else raise "transaction does not exist" unless @transactions.has_key?(trans) @transactions[trans] << frame end end |
#post_init ⇒ Object
18 19 20 21 22 |
# File 'lib/stomp_server.rb', line 18 def post_init @sfr = StompFrameRecognizer.new @transactions = {} @connected = false end |
#process_frame(frame) ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/stomp_server.rb', line 41 def process_frame(frame) cmd = frame.command.downcase.to_sym raise "Unhandled frame: #{cmd}" unless VALID_COMMANDS.include?(cmd) raise "Not connected" if !@connected && cmd != :connect # I really like this code, but my needs are a little trickier # if trans = frame.headers['transaction'] handle_transaction(frame, trans, cmd) else cmd = :sendmsg if cmd == :send send(cmd, frame) end send_receipt(frame.headers['receipt']) if frame.headers['receipt'] end |
#process_frames ⇒ Object
36 37 38 39 |
# File 'lib/stomp_server.rb', line 36 def process_frames frame = nil process_frame(frame) while frame = @sfr.frames.shift end |
#receive_data(data) ⇒ Object
24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/stomp_server.rb', line 24 def receive_data(data) begin puts "receive_data: #{data.inspect}" if $DEBUG @sfr << data process_frames rescue Exception => e puts "err: #{e} #{e.backtrace.join("\n")}" if $DEBUG send_error(e.to_s) close_connection_after_writing end end |
#send_error(msg) ⇒ Object
147 148 149 |
# File 'lib/stomp_server.rb', line 147 def send_error(msg) send_frame("ERROR",{},msg) end |
#send_frame(command, headers = {}, body = '') ⇒ Object
151 152 153 154 |
# File 'lib/stomp_server.rb', line 151 def send_frame(command, headers={}, body='') response = StompFrame.new(command, headers, body) send_data(response.to_s) end |
#send_message(msg) ⇒ Object
138 139 140 141 |
# File 'lib/stomp_server.rb', line 138 def (msg) msg.command = "MESSAGE" send_data(msg.to_s) end |
#send_receipt(id) ⇒ Object
143 144 145 |
# File 'lib/stomp_server.rb', line 143 def send_receipt(id) send_frame("RECEIPT", { 'receipt-id' => id}) end |
#sendmsg(frame) ⇒ Object
75 76 77 78 79 80 81 82 83 |
# File 'lib/stomp_server.rb', line 75 def sendmsg(frame) # set message id frame.headers['message-id'] = "msg-#{@@journal.system_id}-#{@@journal.next_index}" if frame.dest.match(%r|^/queue|) @@queue_manager.sendmsg(frame) else @@topic_manager.sendmsg(frame) end end |
#subscribe(frame) ⇒ Object
85 86 87 88 89 90 91 |
# File 'lib/stomp_server.rb', line 85 def subscribe(frame) if frame.dest =~ %r|^/queue| @@queue_manager.subscribe(frame.dest, self) else @@topic_manager.subscribe(frame.dest, self) end end |
#unbind ⇒ Object
133 134 135 136 |
# File 'lib/stomp_server.rb', line 133 def unbind @@queue_manager.disconnect(self) @@topic_manager.disconnect(self) end |
#unsubscribe(frame) ⇒ Object
93 94 95 96 97 98 99 |
# File 'lib/stomp_server.rb', line 93 def unsubscribe(frame) if frame.dest =~ %r|^/queue| @@queue_manager.unsubscribe(self) else @@topic_manager.unsubscribe(self) end end |