Class: NebulousStomp::StompHandler
- Inherits:
-
Object
- Object
- NebulousStomp::StompHandler
- Defined in:
- lib/nebulous_stomp/stomp_handler.rb
Overview
A Class to deal with talking to STOMP via the Stomp gem.
You shouldn’t ever need to instantiate this yourself. For listening to messages and responding, use NebulousStomp::Listener. For sending a message and waiting for a response, you want NebulousStomp::Request (passing it a NebulousStomp::Message).
Direct Known Subclasses
Instance Attribute Summary collapse
-
#conn ⇒ Object
readonly
Returns the value of attribute conn.
Instance Method Summary collapse
-
#calc_reply_id ⇒ Object
Return the neb-reply-id we’re going to use for this connection.
-
#connected? ⇒ Boolean
return true if we are connected to the STOMP server.
-
#initialize(connectHash = nil, testConn = nil) ⇒ StompHandler
constructor
Initialise StompHandler by passing the parameter hash.
-
#listen(queue, logid = "") ⇒ Object
Block for incoming messages on a queue.
-
#listen_with_timeout(queue, timeout, logid = "") ⇒ Object
As listen() but give up after yielding a single message, and only wait for a set number of seconds before giving up anyway.
-
#nebulous_on? ⇒ Boolean
return true if Nebulous is turned on in the parameters.
-
#send_message(queue, mess, logid = "") ⇒ Object
Send a Message to a queue; return the message.
-
#stomp_connect(logid = "") ⇒ Object
Connect to the STOMP client.
-
#stomp_disconnect(logid = "") ⇒ Object
Drop the connection to the STOMP Client.
Constructor Details
#initialize(connectHash = nil, testConn = nil) ⇒ StompHandler
Initialise StompHandler by passing the parameter hash.
23 24 25 26 27 |
# File 'lib/nebulous_stomp/stomp_handler.rb', line 23 def initialize(connectHash=nil, testConn=nil) @stomp_hash = connectHash ? connectHash.dup : nil @test_conn = testConn @conn = nil end |
Instance Attribute Details
#conn ⇒ Object (readonly)
Returns the value of attribute conn.
17 18 19 |
# File 'lib/nebulous_stomp/stomp_handler.rb', line 17 def conn @conn end |
Instance Method Details
#calc_reply_id ⇒ Object
Return the neb-reply-id we’re going to use for this connection
213 214 215 216 217 218 219 220 221 |
# File 'lib/nebulous_stomp/stomp_handler.rb', line 213 def calc_reply_id return nil unless nebulous_on? fail ConnectionError, "Client not connected" unless @conn @conn.connection_frame().headers["session"] \ << "_" \ << Time.now.to_f.to_s end |
#connected? ⇒ Boolean
return true if we are connected to the STOMP server
66 67 68 |
# File 'lib/nebulous_stomp/stomp_handler.rb', line 66 def connected? !!(@conn && @conn.open?()) end |
#listen(queue, logid = "") ⇒ Object
Block for incoming messages on a queue. Yield each message.
This method automatically consumes every message it reads, since the assumption is that we are using it for the request-response use case. If you don’t want that, try listen_with_timeout(), instead.
It runs in a thread; if you want it to stop, just stop waiting for it.
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/nebulous_stomp/stomp_handler.rb', line 86 def listen(queue, logid="") return unless nebulous_on? NebulousStomp.logger.info(__FILE__) {"Subscribing to #{queue}"} Thread.new do stomp_connect unless @conn # Startle the queue into existence. You can't subscribe to a queue that # does not exist, BUT, you can create a queue by posting to it... @conn.publish( queue, "boo" ) @conn.subscribe( queue, {ack: "client-individual"} ) loop do begin msg = @conn.poll() if msg.nil? sleep 0.2 else log_msg(msg, logid) ack(msg) yield Message.from_stomp(msg) unless msg.body == 'boo' \ || msg.respond_to?(:command) \ && msg.command == "ERROR" end # of else rescue =>e NebulousStomp.logger.error(__FILE__) {log_helper logid, "Error during polling: #{e}"} end end end # of thread end |
#listen_with_timeout(queue, timeout, logid = "") ⇒ Object
As listen() but give up after yielding a single message, and only wait for a set number of seconds before giving up anyway.
The behaviour here is slightly different than listen(). If you return true from your block, the message will be consumed and the method will end. Otherwise it will continue until it sees another message, or reaches the timeout.
Put another way, since most things are truthy – if you want to examine messages to find the right one, return false from the block to get another.
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
# File 'lib/nebulous_stomp/stomp_handler.rb', line 135 def listen_with_timeout(queue, timeout, logid="") return unless nebulous_on? NebulousStomp.logger.info(__FILE__) {log_helper logid, "Subscribing to #{queue} with timeout #{timeout}"} stomp_connect unless @conn id = rand(10000) @conn.publish( queue, "boo" ) done = false time = Time.now @conn.subscribe(queue, {ack: "client-individual"}, id) NebulousStomp.logger.debug(__FILE__) {log_helper logid, "subscribed"} loop do begin msg = @conn.poll() if msg.nil? # NebulousStomp.logger.debug(__FILE__) {log_helper logid, "Empty message, sleeping"} sleep 0.2 else log_msg(msg, logid) if msg.respond_to?(:command) && msg.command == "ERROR" NebulousStomp.logger.error(__FILE__) {log_helper logid, "Error frame: #{msg.inspect}" } ack(msg) elsif msg.respond_to?(:body) && msg.body == "boo" ack(msg) else done = yield Message.from_stomp(msg) if done NebulousStomp.logger.debug(__FILE__) {log_helper logid, "Yield returns true"} ack(msg) end end end # of else rescue =>e NebulousStomp.logger.error(__FILE__) {log_helper logid, "Error during polling: #{e}"} end break if done if timeout && (time + timeout < Time.now) NebulousStomp.logger.debug(__FILE__) {log_helper logid, "Timed out"} break end end NebulousStomp.logger.debug(__FILE__) {log_helper logid, "Out of loop. done=#{done}"} @conn.unsubscribe(queue, {}, id) fail NebulousTimeout unless done end |
#nebulous_on? ⇒ Boolean
return true if Nebulous is turned on in the parameters
73 74 75 |
# File 'lib/nebulous_stomp/stomp_handler.rb', line 73 def nebulous_on? !!(@stomp_hash && !@stomp_hash.empty?) end |
#send_message(queue, mess, logid = "") ⇒ Object
Send a Message to a queue; return the message.
197 198 199 200 201 202 203 204 205 206 207 208 |
# File 'lib/nebulous_stomp/stomp_handler.rb', line 197 def (queue, mess, logid="") return nil unless nebulous_on? fail NebulousStomp::NebulousError, "That's not a Message" \ unless mess.respond_to?(:body_for_stomp) \ && mess.respond_to?(:headers_for_stomp) stomp_connect unless @conn headers = mess.headers_for_stomp.reject{|k,v| v.nil? || v == "" } @conn.publish(queue, mess.body_for_stomp, headers) mess end |
#stomp_connect(logid = "") ⇒ Object
Connect to the STOMP client.
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/nebulous_stomp/stomp_handler.rb', line 32 def stomp_connect(logid="") return self unless nebulous_on? NebulousStomp.logger.info(__FILE__) {log_helper logid, "Connecting to STOMP"} NebulousStomp.logger.debug(__FILE__) {log_helper logid, @stomp_hash.inspect} @conn = @test_conn || Stomp::Connection.new(@stomp_hash) fail ConnectionError, "Stomp Connection failed" unless @conn.open?() cf = @conn.connection_frame() if cf.command == Stomp::CMD_ERROR fail ConnectionError, "Connect Error: #{cf.body}" end self rescue => err raise ConnectionError, err end |
#stomp_disconnect(logid = "") ⇒ Object
Drop the connection to the STOMP Client
53 54 55 56 57 58 59 60 61 |
# File 'lib/nebulous_stomp/stomp_handler.rb', line 53 def stomp_disconnect(logid="") if @conn NebulousStomp.logger.info(__FILE__) {log_helper logid, "STOMP Disconnect"} @conn.disconnect() if @conn @conn = nil end self end |