Class: TorqueBox::Messaging::Destination
- Inherits:
-
Object
- Object
- TorqueBox::Messaging::Destination
- Includes:
- Enumerable, Injectors
- Defined in:
- lib/torquebox/messaging/destination.rb
Constant Summary collapse
- PRIORITY_MAP =
{ :low => 1, :normal => 4, :high => 7, :critical => 9 }
Instance Attribute Summary collapse
-
#connect_options ⇒ Object
Returns the value of attribute connect_options.
-
#connection_factory ⇒ Object
readonly
Returns the value of attribute connection_factory.
-
#enumerable_options ⇒ Object
Returns the value of attribute enumerable_options.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
Class Method Summary collapse
Instance Method Summary collapse
- #_dump(depth) ⇒ Object
- #each(&block) ⇒ Object
-
#initialize(destination, connection_factory_or_options = nil) ⇒ Destination
constructor
A new instance of Destination.
- #normalize_options(options) ⇒ Object
- #publish(message, options = {}) ⇒ Object
- #receive(options = {}, &block) ⇒ Object
- #to_s ⇒ Object
- #wait_for_destination(timeout = nil, &block) ⇒ Object
- #with_session(opts = {}) ⇒ Object
Constructor Details
#initialize(destination, connection_factory_or_options = nil) ⇒ Destination
Returns a new instance of Destination.
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/torquebox/messaging/destination.rb', line 50 def initialize(destination, = nil) raise ArgumentError, "destination cannot be nil" unless destination if .nil? || .is_a?( Hash ) = connection_factory = fetch( 'connection-factory' ) unless .nil? # Don't use our internal connection factory if the user # has specified a host or port to connect to connection_factory = nil if [:host] or [:port] end @connection_factory = ConnectionFactory.new( connection_factory ) @connect_options = || {} else @connection_factory = ConnectionFactory.new( ) @connect_options = {} end @name = destination @enumerable_options = {} end |
Instance Attribute Details
#connect_options ⇒ Object
Returns the value of attribute connect_options.
32 33 34 |
# File 'lib/torquebox/messaging/destination.rb', line 32 def @connect_options end |
#connection_factory ⇒ Object (readonly)
Returns the value of attribute connection_factory.
29 30 31 |
# File 'lib/torquebox/messaging/destination.rb', line 29 def connection_factory @connection_factory end |
#enumerable_options ⇒ Object
Returns the value of attribute enumerable_options.
31 32 33 |
# File 'lib/torquebox/messaging/destination.rb', line 31 def @enumerable_options end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
30 31 32 |
# File 'lib/torquebox/messaging/destination.rb', line 30 def name @name end |
Class Method Details
._load(str) ⇒ Object
46 47 48 |
# File 'lib/torquebox/messaging/destination.rb', line 46 def self._load(str) self.new( str ) end |
Instance Method Details
#_dump(depth) ⇒ Object
41 42 43 44 |
# File 'lib/torquebox/messaging/destination.rb', line 41 def _dump(depth) return self.name.queue_name if self.name.respond_to?( :queue_name ) self.name.to_s end |
#each(&block) ⇒ Object
93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/torquebox/messaging/destination.rb', line 93 def each(&block) wait_for_destination do with_session do |session| destination = session.java_destination( self ) browser = session.create_browser( destination, [:selector] ) begin browser.each(&block) ensure browser.close end end end end |
#normalize_options(options) ⇒ Object
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/torquebox/messaging/destination.rb', line 132 def () if .has_key?(:persistent) [:delivery_mode] = .delete(:persistent) ? javax.jms::DeliveryMode.PERSISTENT : javax.jms::DeliveryMode.NON_PERSISTENT end if .has_key?(:priority) if PRIORITY_MAP[[:priority]] [:priority] = PRIORITY_MAP[[:priority]] elsif (0..9) === [:priority].to_i [:priority] = [:priority].to_i else raise ArgumentError.new(":priority must in the range 0..9, or one of #{PRIORITY_MAP.keys.collect {|k| ":#{k}"}.join(',')}") end end end |
#publish(message, options = {}) ⇒ Object
70 71 72 73 74 75 76 |
# File 'lib/torquebox/messaging/destination.rb', line 70 def publish(, = {}) wait_for_destination([:startup_timeout]) do with_session() do |session| session.publish self, , () end end end |
#receive(options = {}, &block) ⇒ Object
78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/torquebox/messaging/destination.rb', line 78 def receive( = {}, &block) wait_for_destination([:startup_timeout]) do func = lambda do with_session() do |session| session.receive self, , &block end end if block TorqueBox.transaction &func else func.call end end end |
#to_s ⇒ Object
156 157 158 |
# File 'lib/torquebox/messaging/destination.rb', line 156 def to_s name end |
#wait_for_destination(timeout = nil, &block) ⇒ Object
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/torquebox/messaging/destination.rb', line 116 def wait_for_destination(timeout=nil, &block) timeout ||= 30_000 # 30s default start = Time.now begin block.call rescue javax.naming.NameNotFoundException, javax.jms.JMSException elapsed = (Time.now - start) * 1000 if elapsed > timeout raise else sleep(0.1) retry end end end |
#with_session(opts = {}) ⇒ Object
107 108 109 110 111 112 113 114 |
# File 'lib/torquebox/messaging/destination.rb', line 107 def with_session(opts = {}) transactional = opts.fetch(:tx, true) connection_factory.with_new_connection( ) do |connection| connection.with_session(transactional) do |session| yield session end end end |