Class: Stapfen::Client::JMS
- Inherits:
-
Object
- Object
- Stapfen::Client::JMS
- Defined in:
- lib/stapfen/client/jms.rb
Instance Attribute Summary collapse
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
Instance Method Summary collapse
- #can_unreceive? ⇒ Boolean
-
#close ⇒ Boolean
Close the JMS::Connection and the JMS::Session if it’s been created for this client.
-
#closed? ⇒ Boolean
API compatibilty method, doesn’t actually indicate that the connection is closed.
-
#connect(*args) ⇒ JMS::Connection
Connect to the broker via JMS and start the JMS session.
-
#initialize(configuration) ⇒ JMS
constructor
A new instance of JMS.
- #publish(destination, body, headers = {}) ⇒ Object
- #runloop ⇒ Object
-
#session ⇒ JMS::Session
Accessor method which will cache the session if we’ve already created it once.
- #subscribe(destination, headers = {}, &block) ⇒ Object
-
#unreceive(message, unreceive_headers) ⇒ Object
JMS doesn’t implement unreceive in quite the way Stomp does, so we’ll implement it here.
Constructor Details
#initialize(configuration) ⇒ JMS
Returns a new instance of JMS.
9 10 11 12 13 |
# File 'lib/stapfen/client/jms.rb', line 9 def initialize(configuration) super() @config = configuration @connection = nil end |
Instance Attribute Details
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
7 8 9 |
# File 'lib/stapfen/client/jms.rb', line 7 def connection @connection end |
Instance Method Details
#can_unreceive? ⇒ Boolean
86 87 88 |
# File 'lib/stapfen/client/jms.rb', line 86 def can_unreceive? true end |
#close ⇒ Boolean
Close the JMS::Connection and the JMS::Session if it’s been created for this client
64 65 66 67 68 69 70 |
# File 'lib/stapfen/client/jms.rb', line 64 def close return false unless @connection @session.close if @session @connection.close @connection = nil return true end |
#closed? ⇒ Boolean
API compatibilty method, doesn’t actually indicate that the connection is closed. Will only return true if no connection currently exists
76 77 78 |
# File 'lib/stapfen/client/jms.rb', line 76 def closed? return connection.nil? end |
#connect(*args) ⇒ JMS::Connection
Connect to the broker via JMS and start the JMS session
18 19 20 21 22 |
# File 'lib/stapfen/client/jms.rb', line 18 def connect(*args) @connection = ::JMS::Connection.new(@config) @connection.start return @connection end |
#publish(destination, body, headers = {}) ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/stapfen/client/jms.rb', line 33 def publish(destination, body, headers={}) destination = Stapfen::Destination.from_string(destination) session.producer(destination.jms_opts) do |p| # Create the JMS typed Message = session.(body) .delivery_mode = ::JMS::DeliveryMode::PERSISTENT if headers.delete(:persistent) # Take the remainder of the headers and push them into the message # properties. headers.each_pair do |key, value| .setStringProperty(key.to_s, value.to_s) end p.send() end end |
#runloop ⇒ Object
80 81 82 83 84 |
# File 'lib/stapfen/client/jms.rb', line 80 def runloop loop do sleep 1 end end |
#session ⇒ JMS::Session
Accessor method which will cache the session if we’ve already created it once
29 30 31 |
# File 'lib/stapfen/client/jms.rb', line 29 def session @session ||= connection.create_session end |
#subscribe(destination, headers = {}, &block) ⇒ Object
52 53 54 55 56 57 |
# File 'lib/stapfen/client/jms.rb', line 52 def subscribe(destination, headers={}, &block) destination = Stapfen::Destination.from_string(destination) connection.(destination.jms_opts) do |m| block.call(m) end end |
#unreceive(message, unreceive_headers) ⇒ Object
JMS doesn’t implement unreceive in quite the way Stomp does, so we’ll implement it here.
Given a message and a set of unreceive headers, this will deliver the message back to its originating queue a limited number of times, then failover to the dead letter queue.
103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/stapfen/client/jms.rb', line 103 def unreceive(, unreceive_headers) return if unreceive_headers[:max_redeliveries].nil? && unreceive_headers[:max_redeliveries].nil? destination = .destination.to_s.sub('queue://','/queue/').sub('topic://','/topic') retry_count = .getStringProperty('retry_count').to_i || 0 retry_count +=1 if unreceive_headers[:max_redeliveries] && (retry_count <= unreceive_headers[:max_redeliveries]) self.publish(destination, .data, {'retry_count' => retry_count.to_s}) elsif unreceive_headers[:dead_letter_queue] # Done retrying, send to DLQ self.publish(unreceive_headers[:dead_letter_queue], .data, {:original_destination => destination}) end end |