Class: TorqueBox::Messaging::Destination
- Inherits:
-
Object
- Object
- TorqueBox::Messaging::Destination
- Includes:
- Enumerable
- Defined in:
- lib/torquebox/messaging/destination.rb
Constant Summary collapse
- PRIORITY_MAP =
{ :low => 1, :normal => 4, :high => 7, :critical => 9 }
Instance Attribute Summary collapse
-
#connect_options ⇒ Object
Returns the value of attribute connect_options.
-
#connection_factory ⇒ Object
readonly
Returns the value of attribute connection_factory.
-
#enumerable_options ⇒ Object
Returns the value of attribute enumerable_options.
-
#java_destination ⇒ Object
readonly
Returns the value of attribute java_destination.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
Class Method Summary collapse
- ._load(str) ⇒ Object
-
.list ⇒ Object
List all destinations of this application.
-
.lookup(name) ⇒ TorqueBox::Messaging::Queue
Lookup a destination of this application by name.
- .wait_for_latch(latch) ⇒ Object private
- .with_destinationizer ⇒ Object private
Instance Method Summary collapse
- #_dump(level) ⇒ Object
- #each(&block) ⇒ Object
-
#initialize(destination, connection_factory_or_options = nil) ⇒ Destination
constructor
A new instance of Destination.
- #normalize_options(options) ⇒ Object
-
#publish(message, options = {}) ⇒ void
Publishes a message to the destination.
- #receive(options = {}, &block) ⇒ Object
-
#stop ⇒ java.util.concurrent.CountDownLatch
Stops the destination.
-
#stop_sync ⇒ Boolean
Stops the destination.
- #to_s ⇒ Object
- #wait_for_destination(timeout = nil, &block) ⇒ Object
- #with_session(opts = {}) ⇒ Object
Constructor Details
#initialize(destination, connection_factory_or_options = nil) ⇒ Destination
Returns a new instance of Destination.
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/torquebox/messaging/destination.rb', line 50 def initialize(destination, = nil) raise ArgumentError, "destination cannot be nil" unless destination if .nil? || .is_a?( Hash ) = connection_factory = TorqueBox.fetch( 'connection-factory' ) unless .nil? # Don't use our internal connection factory if the user # has specified a host or port to connect to connection_factory = nil if [:host] or [:port] end @connection_factory = ConnectionFactory.new( connection_factory ) @connect_options = || {} else @connection_factory = ConnectionFactory.new( ) @connect_options = {} end if destination.is_a?(javax.jms.Destination ) if destination.is_a?(javax.jms.Queue) @name = destination.queue_name else @name = destination.topic_name end @java_destination = destination else @name = destination end @enumerable_options = {} end |
Instance Attribute Details
#connect_options ⇒ Object
Returns the value of attribute connect_options.
33 34 35 |
# File 'lib/torquebox/messaging/destination.rb', line 33 def @connect_options end |
#connection_factory ⇒ Object (readonly)
Returns the value of attribute connection_factory.
29 30 31 |
# File 'lib/torquebox/messaging/destination.rb', line 29 def connection_factory @connection_factory end |
#enumerable_options ⇒ Object
Returns the value of attribute enumerable_options.
32 33 34 |
# File 'lib/torquebox/messaging/destination.rb', line 32 def @enumerable_options end |
#java_destination ⇒ Object (readonly)
Returns the value of attribute java_destination.
31 32 33 |
# File 'lib/torquebox/messaging/destination.rb', line 31 def java_destination @java_destination end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
30 31 32 |
# File 'lib/torquebox/messaging/destination.rb', line 30 def name @name end |
Class Method Details
._load(str) ⇒ Object
46 47 48 |
# File 'lib/torquebox/messaging/destination.rb', line 46 def self._load(str) self.new( str ) end |
.list ⇒ Object
List all destinations of this application.
212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 |
# File 'lib/torquebox/messaging/destination.rb', line 212 def list # Get the JMS Manager TorqueBox::ServiceRegistry.lookup("jboss.messaging.default.jms.manager") do |manager| # JMSServerControl will let us grab the deployed queue/topic list server_control = Java::org.hornetq.jms.management.impl.JMSServerControlImpl.new(manager) # Retrieve the destination list appropriate to the destination type if self == TorqueBox::Messaging::Topic names = server_control.topic_names elsif self == TorqueBox::Messaging::Queue names = server_control.queue_names else names = [] end names.map { |name| self.new(name) } end end |
.lookup(name) ⇒ TorqueBox::Messaging::Queue
Lookup a destination of this application by name. A destination could be a queue or topic.
239 240 241 |
# File 'lib/torquebox/messaging/destination.rb', line 239 def lookup(name) list.find { |destination| destination.name == name } end |
.wait_for_latch(latch) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
253 254 255 256 257 258 259 260 261 262 263 264 |
# File 'lib/torquebox/messaging/destination.rb', line 253 def wait_for_latch(latch) if latch begin # Wait for the services to come up for up to 30 seconds latch.await(45, java.util.concurrent.TimeUnit::SECONDS) rescue return false end end true end |
.with_destinationizer ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
244 245 246 247 248 249 250 |
# File 'lib/torquebox/messaging/destination.rb', line 244 def with_destinationizer service_name = TorqueBox::MSC.deployment_unit.service_name.append('torquebox').append('messaging').append('destinationizer') TorqueBox::ServiceRegistry.lookup(service_name) do |destinationizer| yield destinationizer end end |
Instance Method Details
#_dump(level) ⇒ Object
42 43 44 |
# File 'lib/torquebox/messaging/destination.rb', line 42 def _dump(level) name end |
#each(&block) ⇒ Object
132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/torquebox/messaging/destination.rb', line 132 def each(&block) wait_for_destination do with_session do |session| destination = session.java_destination( self ) browser = session.create_browser( destination, [:selector] ) begin browser.each(&block) ensure browser.close end end end end |
#normalize_options(options) ⇒ Object
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
# File 'lib/torquebox/messaging/destination.rb', line 178 def () if .has_key?(:persistent) [:delivery_mode] = .delete(:persistent) ? javax.jms::DeliveryMode.PERSISTENT : javax.jms::DeliveryMode.NON_PERSISTENT end if .has_key?(:priority) if PRIORITY_MAP[[:priority]] [:priority] = PRIORITY_MAP[[:priority]] elsif (0..9) === [:priority].to_i [:priority] = [:priority].to_i else raise ArgumentError.new(":priority must in the range 0..9, or one of #{PRIORITY_MAP.keys.collect {|k| ":#{k}"}.join(',')}") end end end |
#publish(message, options = {}) ⇒ void
This method returns an undefined value.
Publishes a message to the destination
109 110 111 112 113 114 115 |
# File 'lib/torquebox/messaging/destination.rb', line 109 def publish(, = {}) wait_for_destination([:startup_timeout]) do with_session() do |session| session.publish self, , () end end end |
#receive(options = {}, &block) ⇒ Object
117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/torquebox/messaging/destination.rb', line 117 def receive( = {}, &block) wait_for_destination([:startup_timeout]) do func = lambda do with_session() do |session| session.receive self, , &block end end if block TorqueBox.transaction &func else func.call end end end |
#stop ⇒ java.util.concurrent.CountDownLatch
This is an asynchronous method.
Stops the destination.
89 90 91 92 93 |
# File 'lib/torquebox/messaging/destination.rb', line 89 def stop TorqueBox::Messaging::Destination.with_destinationizer do |destinationizer| destinationizer.remove_destination(name) end end |
#stop_sync ⇒ Boolean
This is a synchronous method.
Stops the destination.
100 101 102 |
# File 'lib/torquebox/messaging/destination.rb', line 100 def stop_sync TorqueBox::Messaging::Destination.wait_for_latch(stop) end |
#to_s ⇒ Object
202 203 204 |
# File 'lib/torquebox/messaging/destination.rb', line 202 def to_s name end |
#wait_for_destination(timeout = nil, &block) ⇒ Object
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/torquebox/messaging/destination.rb', line 162 def wait_for_destination(timeout=nil, &block) timeout ||= 30_000 # 30s default start = Time.now begin block.call rescue javax.naming.NameNotFoundException, javax.jms.JMSException elapsed = (Time.now - start) * 1000 if elapsed > timeout raise else sleep(0.1) retry end end end |
#with_session(opts = {}) ⇒ Object
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/torquebox/messaging/destination.rb', line 146 def with_session(opts = {}) transactional = opts.fetch(:tx, true) # https://issues.jboss.org/browse/TORQUE-1033 # If there is no encoding for the message, set the default one # for the destination. If the encoding for destination isn't set # it won't hurt opts[:encoding] = @connect_options[:encoding] if opts[:encoding].nil? connection_factory.with_new_connection(, transactional) do |connection| connection.with_session do |session| yield session end end end |