Class: NebulousStomp::StompHandler
- Inherits:
-
Object
- Object
- NebulousStomp::StompHandler
- Defined in:
- lib/nebulous_stomp/stomp_handler.rb
Overview
A Class to deal with talking to STOMP via the Stomp gem.
You shouldn’t ever need to instantiate this yourself. For listening to messages and responding, use NebulousStomp::Listener. For sending a message and waiting for a response, you want NebulousStomp::Request (passing it a NebulousStomp::Message).
Direct Known Subclasses
Instance Attribute Summary collapse
-
#conn ⇒ Object
readonly
Returns the value of attribute conn.
Instance Method Summary collapse
-
#calc_reply_id ⇒ Object
Return the neb-reply-id we’re going to use for this connection.
-
#connected? ⇒ Boolean
return true if we are connected to the STOMP server.
-
#initialize(connectHash = nil, testConn = nil) ⇒ StompHandler
constructor
Initialise StompHandler by passing the parameter hash.
-
#listen(queue, logid = "") ⇒ Object
Block for incoming messages on a queue.
-
#listen_with_timeout(queue, timeout, logid = "") ⇒ Object
As listen() but give up after yielding a single message, and only wait for a set number of seconds before giving up anyway.
-
#nebulous_on? ⇒ Boolean
return true if Nebulous is turned on in the parameters.
-
#send_message(queue, mess, logid = "") ⇒ Object
Send a Message to a queue; return the message.
-
#stomp_connect(logid = "") ⇒ Object
Connect to the STOMP client.
-
#stomp_disconnect(logid = "") ⇒ Object
Drop the connection to the STOMP Client.
Constructor Details
#initialize(connectHash = nil, testConn = nil) ⇒ StompHandler
Initialise StompHandler by passing the parameter hash.
23 24 25 26 27 |
# File 'lib/nebulous_stomp/stomp_handler.rb', line 23 def initialize(connectHash=nil, testConn=nil) @stomp_hash = connectHash ? connectHash.dup : nil @test_conn = testConn @conn = nil end |
Instance Attribute Details
#conn ⇒ Object (readonly)
Returns the value of attribute conn.
17 18 19 |
# File 'lib/nebulous_stomp/stomp_handler.rb', line 17 def conn @conn end |
Instance Method Details
#calc_reply_id ⇒ Object
Return the neb-reply-id we’re going to use for this connection
206 207 208 209 210 211 212 213 214 |
# File 'lib/nebulous_stomp/stomp_handler.rb', line 206 def calc_reply_id return nil unless nebulous_on? fail ConnectionError, "Client not connected" unless @conn @conn.connection_frame().headers["session"] \ << "_" \ << Time.now.to_f.to_s end |
#connected? ⇒ Boolean
return true if we are connected to the STOMP server
66 67 68 |
# File 'lib/nebulous_stomp/stomp_handler.rb', line 66 def connected? !!(@conn && @conn.open?()) end |
#listen(queue, logid = "") ⇒ Object
Block for incoming messages on a queue. Yield each message.
This method automatically consumes every message it reads, since the assumption is that we are using it for the request-response use case. If you don’t want that, try listen_with_timeout(), instead.
It runs in a thread; if you want it to stop, just stop waiting for it.
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/nebulous_stomp/stomp_handler.rb', line 86 def listen(queue, logid="") return unless nebulous_on? NebulousStomp.logger.info(__FILE__) {"Subscribing to #{queue}"} Thread.new do stomp_connect unless @conn # Startle the queue into existence. You can't subscribe to a queue that # does not exist, BUT, you can create a queue by posting to it... @conn.publish( queue, "boo" ) @conn.subscribe( queue, {ack: "client-individual"} ) loop do begin msg = @conn.poll() log_msg(msg, logid) ack(msg) yield Message.from_stomp(msg) \ unless msg.body == 'boo' \ || msg.respond_to?(:command) && msg.command == "ERROR" rescue =>e NebulousStomp.logger.error(__FILE__) {log_helper logid, "Error during polling: #{e}"} end end end # of thread end |
#listen_with_timeout(queue, timeout, logid = "") ⇒ Object
As listen() but give up after yielding a single message, and only wait for a set number of seconds before giving up anyway.
The behaviour here is slightly different than listen(). If you return true from your block, the message will be consumed and the method will end. Otherwise it will continue until it sees another message, or reaches the timeout.
Put another way, since most things are truthy – if you want to examine messages to find the right one, return false from the block to get another.
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
# File 'lib/nebulous_stomp/stomp_handler.rb', line 128 def listen_with_timeout(queue, timeout, logid="") return unless nebulous_on? NebulousStomp.logger.info(__FILE__) {log_helper logid, "Subscribing to #{queue} with timeout #{timeout}"} stomp_connect unless @conn id = rand(10000) @conn.publish( queue, "boo" ) done = false time = Time.now @conn.subscribe(queue, {ack: "client-individual"}, id) NebulousStomp.logger.debug(__FILE__) {log_helper logid, "subscribed"} loop do begin msg = @conn.poll() if msg.nil? # NebulousStomp.logger.debug(__FILE__) {log_helper logid, "Empty message, sleeping"} sleep 0.2 else log_msg(msg, logid) if msg.respond_to?(:command) && msg.command == "ERROR" NebulousStomp.logger.error(__FILE__) {log_helper logid, "Error frame: #{msg.inspect}" } ack(msg) elsif msg.respond_to?(:body) && msg.body == "boo" ack(msg) else done = yield Message.from_stomp(msg) if done NebulousStomp.logger.debug(__FILE__) {log_helper logid, "Yield returns true"} ack(msg) end end end # of else rescue =>e NebulousStomp.logger.error(__FILE__) {log_helper logid, "Error during polling: #{e}"} end break if done if timeout && (time + timeout < Time.now) NebulousStomp.logger.debug(__FILE__) {log_helper logid, "Timed out"} break end end NebulousStomp.logger.debug(__FILE__) {log_helper logid, "Out of loop. done=#{done}"} @conn.unsubscribe(queue, {}, id) fail NebulousTimeout unless done end |
#nebulous_on? ⇒ Boolean
return true if Nebulous is turned on in the parameters
73 74 75 |
# File 'lib/nebulous_stomp/stomp_handler.rb', line 73 def nebulous_on? !!(@stomp_hash && !@stomp_hash.empty?) end |
#send_message(queue, mess, logid = "") ⇒ Object
Send a Message to a queue; return the message.
190 191 192 193 194 195 196 197 198 199 200 201 |
# File 'lib/nebulous_stomp/stomp_handler.rb', line 190 def (queue, mess, logid="") return nil unless nebulous_on? fail NebulousStomp::NebulousError, "That's not a Message" \ unless mess.respond_to?(:body_for_stomp) \ && mess.respond_to?(:headers_for_stomp) stomp_connect unless @conn headers = mess.headers_for_stomp.reject{|k,v| v.nil? || v == "" } @conn.publish(queue, mess.body_for_stomp, headers) mess end |
#stomp_connect(logid = "") ⇒ Object
Connect to the STOMP client.
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/nebulous_stomp/stomp_handler.rb', line 32 def stomp_connect(logid="") return self unless nebulous_on? NebulousStomp.logger.info(__FILE__) {log_helper logid, "Connecting to STOMP"} NebulousStomp.logger.debug(__FILE__) {log_helper logid, @stomp_hash.inspect} @conn = @test_conn || Stomp::Connection.new(@stomp_hash) fail ConnectionError, "Stomp Connection failed" unless @conn.open?() cf = @conn.connection_frame() if cf.command == Stomp::CMD_ERROR fail ConnectionError, "Connect Error: #{cf.body}" end self rescue => err raise ConnectionError, err end |
#stomp_disconnect(logid = "") ⇒ Object
Drop the connection to the STOMP Client
53 54 55 56 57 58 59 60 61 |
# File 'lib/nebulous_stomp/stomp_handler.rb', line 53 def stomp_disconnect(logid="") if @conn NebulousStomp.logger.info(__FILE__) {log_helper logid, "STOMP Disconnect"} @conn.disconnect() if @conn @conn = nil end self end |