Class: NebulousStomp::StompHandler

Inherits:
Object
  • Object
show all
Defined in:
lib/nebulous_stomp/stomp_handler.rb

Overview

A Class to deal with talking to STOMP via the Stomp gem.

You shouldn’t ever need to instantiate this yourself. For listening to messages and responding, use NebulousStomp::Listener. For sending a message and waiting for a response, you want NebulousStomp::Request (passing it a NebulousStomp::Message).

Direct Known Subclasses

StompHandlerNull

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connectHash = nil, testConn = nil) ⇒ StompHandler

Initialise StompHandler by passing the parameter hash.



23
24
25
26
27
# File 'lib/nebulous_stomp/stomp_handler.rb', line 23

def initialize(connectHash=nil, testConn=nil)
  @stomp_hash = connectHash ? connectHash.dup : nil
  @test_conn  = testConn
  @conn       = nil
end

Instance Attribute Details

#connObject (readonly)

Returns the value of attribute conn.



17
18
19
# File 'lib/nebulous_stomp/stomp_handler.rb', line 17

def conn
  @conn
end

Instance Method Details

#calc_reply_idObject

Return the neb-reply-id we’re going to use for this connection



213
214
215
216
217
218
219
220
221
# File 'lib/nebulous_stomp/stomp_handler.rb', line 213

def calc_reply_id
  return nil unless nebulous_on?
  fail ConnectionError, "Client not connected" unless @conn

  @conn.connection_frame().headers["session"] \
    << "_" \
    << Time.now.to_f.to_s

end

#connected?Boolean

return true if we are connected to the STOMP server

Returns:

  • (Boolean)


66
67
68
# File 'lib/nebulous_stomp/stomp_handler.rb', line 66

def connected?
  !!(@conn && @conn.open?())
end

#listen(queue, logid = "") ⇒ Object

Block for incoming messages on a queue. Yield each message.

This method automatically consumes every message it reads, since the assumption is that we are using it for the request-response use case. If you don’t want that, try listen_with_timeout(), instead.

It runs in a thread; if you want it to stop, just stop waiting for it.



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/nebulous_stomp/stomp_handler.rb', line 86

def listen(queue, logid="")
  return unless nebulous_on?
  NebulousStomp.logger.info(__FILE__) {"Subscribing to #{queue}"}

  Thread.new do
    stomp_connect unless @conn

    # Startle the queue into existence. You can't subscribe to a queue that
    # does not exist, BUT, you can create a queue by posting to it...
    @conn.publish( queue, "boo" )
    @conn.subscribe( queue, {ack: "client-individual"} )

    loop do
      begin
        msg = @conn.poll()

        if msg.nil? 
          sleep 0.2

        else
          log_msg(msg, logid)
          ack(msg)

          yield Message.from_stomp(msg) unless msg.body == 'boo' \
                                            ||    msg.respond_to?(:command) \
                                               && msg.command == "ERROR"

        end # of else

      rescue =>e
        NebulousStomp.logger.error(__FILE__) {log_helper logid, "Error during polling: #{e}"}
      end
    end

  end # of thread

end

#listen_with_timeout(queue, timeout, logid = "") ⇒ Object

As listen() but give up after yielding a single message, and only wait for a set number of seconds before giving up anyway.

The behaviour here is slightly different than listen(). If you return true from your block, the message will be consumed and the method will end. Otherwise it will continue until it sees another message, or reaches the timeout.

Put another way, since most things are truthy – if you want to examine messages to find the right one, return false from the block to get another.



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# File 'lib/nebulous_stomp/stomp_handler.rb', line 135

def listen_with_timeout(queue, timeout, logid="")
  return unless nebulous_on?
  NebulousStomp.logger.info(__FILE__) {log_helper logid, "Subscribing to #{queue} with timeout #{timeout}"}

  stomp_connect unless @conn
  id = rand(10000)

  @conn.publish( queue, "boo" )
  
  done = false
  time = Time.now

  @conn.subscribe(queue, {ack: "client-individual"}, id) 
  NebulousStomp.logger.debug(__FILE__) {log_helper logid, "subscribed"}

  loop do
    begin
      msg = @conn.poll()

      if msg.nil?
        # NebulousStomp.logger.debug(__FILE__) {log_helper logid, "Empty message, sleeping"}
        sleep 0.2
      else
        log_msg(msg, logid)

        if msg.respond_to?(:command) && msg.command == "ERROR"
          NebulousStomp.logger.error(__FILE__) {log_helper logid, "Error frame: #{msg.inspect}" }
          ack(msg)
        elsif msg.respond_to?(:body) && msg.body == "boo"
          ack(msg)
        else
          done = yield Message.from_stomp(msg) 
          if done
            NebulousStomp.logger.debug(__FILE__) {log_helper logid, "Yield returns true"}
            ack(msg)
          end
        end

      end # of else

    rescue =>e
      NebulousStomp.logger.error(__FILE__) {log_helper logid, "Error during polling: #{e}"}
    end

    break if done

    if timeout && (time + timeout < Time.now)
      NebulousStomp.logger.debug(__FILE__) {log_helper logid, "Timed out"}
      break
    end
  end

  NebulousStomp.logger.debug(__FILE__) {log_helper logid, "Out of loop. done=#{done}"}

  @conn.unsubscribe(queue, {}, id)

  fail NebulousTimeout unless done
end

#nebulous_on?Boolean

return true if Nebulous is turned on in the parameters

Returns:

  • (Boolean)


73
74
75
# File 'lib/nebulous_stomp/stomp_handler.rb', line 73

def nebulous_on?
  !!(@stomp_hash && !@stomp_hash.empty?)
end

#send_message(queue, mess, logid = "") ⇒ Object

Send a Message to a queue; return the message.



197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/nebulous_stomp/stomp_handler.rb', line 197

def send_message(queue, mess, logid="")
  return nil unless nebulous_on?
  fail NebulousStomp::NebulousError, "That's not a Message" \
    unless mess.respond_to?(:body_for_stomp) \
        && mess.respond_to?(:headers_for_stomp)

  stomp_connect unless @conn

  headers = mess.headers_for_stomp.reject{|k,v| v.nil? || v == "" }
  @conn.publish(queue, mess.body_for_stomp, headers)
  mess
end

#stomp_connect(logid = "") ⇒ Object

Connect to the STOMP client.



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/nebulous_stomp/stomp_handler.rb', line 32

def stomp_connect(logid="")
  return self unless nebulous_on?
  NebulousStomp.logger.info(__FILE__) {log_helper logid, "Connecting to STOMP"} 
  NebulousStomp.logger.debug(__FILE__) {log_helper logid, @stomp_hash.inspect}

  @conn = @test_conn || Stomp::Connection.new(@stomp_hash)
  fail ConnectionError, "Stomp Connection failed" unless @conn.open?()

  cf = @conn.connection_frame()
  if cf.command == Stomp::CMD_ERROR
    fail ConnectionError, "Connect Error: #{cf.body}"
  end

  self
rescue => err
  raise ConnectionError, err
end

#stomp_disconnect(logid = "") ⇒ Object

Drop the connection to the STOMP Client



53
54
55
56
57
58
59
60
61
# File 'lib/nebulous_stomp/stomp_handler.rb', line 53

def stomp_disconnect(logid="")
  if @conn
    NebulousStomp.logger.info(__FILE__) {log_helper logid, "STOMP Disconnect"}
    @conn.disconnect() if @conn
    @conn = nil
  end

  self
end