Class: TorqueBox::Messaging::Destination

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/torquebox/messaging/destination.rb

Direct Known Subclasses

Queue, Topic

Constant Summary collapse

PRIORITY_MAP =
{
  :low => 1,
  :normal => 4,
  :high => 7,
  :critical => 9
}

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(destination, connection_factory_or_options = nil) ⇒ Destination

Returns a new instance of Destination.

Raises:

  • (ArgumentError)


50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/torquebox/messaging/destination.rb', line 50

def initialize(destination, connection_factory_or_options = nil)
  raise ArgumentError, "destination cannot be nil" unless destination
  if connection_factory_or_options.nil? || connection_factory_or_options.is_a?( Hash )
    options = connection_factory_or_options
    connection_factory = TorqueBox.fetch( 'connection-factory' )
    unless options.nil?
      # Don't use our internal connection factory if the user
      # has specified a host or port to connect to
      connection_factory = nil if options[:host] or options[:port]
    end
    @connection_factory = ConnectionFactory.new( connection_factory )
    @connect_options = options || {}
  else
    @connection_factory  = ConnectionFactory.new( connection_factory_or_options )
    @connect_options = {}
  end


  if destination.is_a?(javax.jms.Destination )
    if destination.is_a?(javax.jms.Queue)
      @name = destination.queue_name
    else
      @name = destination.topic_name
    end

    @java_destination = destination
  else
    @name = destination
  end


  @enumerable_options  = {}
end

Instance Attribute Details

#connect_optionsObject

Returns the value of attribute connect_options.



33
34
35
# File 'lib/torquebox/messaging/destination.rb', line 33

def connect_options
  @connect_options
end

#connection_factoryObject (readonly)

Returns the value of attribute connection_factory.



29
30
31
# File 'lib/torquebox/messaging/destination.rb', line 29

def connection_factory
  @connection_factory
end

#enumerable_optionsObject

Returns the value of attribute enumerable_options.



32
33
34
# File 'lib/torquebox/messaging/destination.rb', line 32

def enumerable_options
  @enumerable_options
end

#java_destinationObject (readonly)

Returns the value of attribute java_destination.



31
32
33
# File 'lib/torquebox/messaging/destination.rb', line 31

def java_destination
  @java_destination
end

#nameObject (readonly)

Returns the value of attribute name.



30
31
32
# File 'lib/torquebox/messaging/destination.rb', line 30

def name
  @name
end

Class Method Details

._load(str) ⇒ Object



46
47
48
# File 'lib/torquebox/messaging/destination.rb', line 46

def self._load(str)
  self.new( str )
end

.listObject

List all destinations of this application.

Returns:

  • Array of Queue or Topic depending on the destination type.



212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# File 'lib/torquebox/messaging/destination.rb', line 212

def list
  # Get the JMS Manager
  TorqueBox::ServiceRegistry.lookup("jboss.messaging.default.jms.manager") do |manager|

    # JMSServerControl will let us grab the deployed queue/topic list
    server_control = Java::org.hornetq.jms.management.impl.JMSServerControlImpl.new(manager)

    # Retrieve the destination list appropriate to the destination type
    if self == TorqueBox::Messaging::Topic
      names = server_control.topic_names
    elsif self == TorqueBox::Messaging::Queue
      names = server_control.queue_names
    else
      names = []
    end

    names.map { |name| self.new(name) }
  end
end

.lookup(name) ⇒ TorqueBox::Messaging::Queue

Lookup a destination of this application by name. A destination could be a queue or topic.

Parameters:

  • name (String)

    of the destination

Returns:



239
240
241
# File 'lib/torquebox/messaging/destination.rb', line 239

def lookup(name)
  list.find { |destination| destination.name == name }
end

.wait_for_latch(latch) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



253
254
255
256
257
258
259
260
261
262
263
264
# File 'lib/torquebox/messaging/destination.rb', line 253

def wait_for_latch(latch)
  if latch
    begin
      # Wait for the services to come up for up to 30 seconds
      latch.await(45, java.util.concurrent.TimeUnit::SECONDS)
    rescue
      return false
    end
  end
  
  true
end

.with_destinationizerObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



244
245
246
247
248
249
250
# File 'lib/torquebox/messaging/destination.rb', line 244

def with_destinationizer
  service_name = TorqueBox::MSC.deployment_unit.service_name.append('torquebox').append('messaging').append('destinationizer')

  TorqueBox::ServiceRegistry.lookup(service_name) do |destinationizer|
    yield destinationizer
  end
end

Instance Method Details

#_dump(level) ⇒ Object



42
43
44
# File 'lib/torquebox/messaging/destination.rb', line 42

def _dump(level)
  name
end

#each(&block) ⇒ Object



132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/torquebox/messaging/destination.rb', line 132

def each(&block)
  wait_for_destination do
    with_session do |session|
      destination = session.java_destination( self )
      browser = session.create_browser( destination, enumerable_options[:selector] )
      begin
        browser.each(&block)
      ensure
        browser.close
      end
    end
  end
end

#normalize_options(options) ⇒ Object



178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/torquebox/messaging/destination.rb', line 178

def normalize_options(options)
  if options.has_key?(:persistent)
    options[:delivery_mode] =
      options.delete(:persistent) ? javax.jms::DeliveryMode.PERSISTENT : javax.jms::DeliveryMode.NON_PERSISTENT
  end

  if options.has_key?(:priority)
    if PRIORITY_MAP[options[:priority]]
      options[:priority] = PRIORITY_MAP[options[:priority]]
    elsif (0..9) === options[:priority].to_i
      options[:priority] = options[:priority].to_i
    else
      raise ArgumentError.new(":priority must in the range 0..9, or one of #{PRIORITY_MAP.keys.collect {|k| ":#{k}"}.join(',')}")
    end
  end

  options
end

#publish(message, options = {}) ⇒ void

This method returns an undefined value.

Publishes a message to the destination

Parameters:

  • message

    The message to publish

  • options (defaults to: {})

    Optional parameters (a Hash)



109
110
111
112
113
114
115
# File 'lib/torquebox/messaging/destination.rb', line 109

def publish(message, options = {})
  wait_for_destination(options[:startup_timeout]) do
    with_session(options) do |session|
      session.publish self, message, normalize_options(options)
    end
  end
end

#receive(options = {}, &block) ⇒ Object



117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/torquebox/messaging/destination.rb', line 117

def receive(options = {}, &block)
  wait_for_destination(options[:startup_timeout]) do
    func = lambda do
      with_session(options) do |session|
        session.receive self, options, &block
      end
    end
    if block
      TorqueBox.transaction &func
    else
      func.call
    end
  end
end

#stopjava.util.concurrent.CountDownLatch

Note:

This is an asynchronous method.

Stops the destination.

Returns:

  • (java.util.concurrent.CountDownLatch)

    The latch to wait for the task completion

See Also:



89
90
91
92
93
# File 'lib/torquebox/messaging/destination.rb', line 89

def stop
  TorqueBox::Messaging::Destination.with_destinationizer do |destinationizer|
    destinationizer.remove_destination(name)
  end
end

#stop_syncBoolean

Note:

This is a synchronous method.

Stops the destination.

Returns:

  • (Boolean)

    true if the destination was successfully stopped, false otherwise

See Also:

  • stop


100
101
102
# File 'lib/torquebox/messaging/destination.rb', line 100

def stop_sync
  TorqueBox::Messaging::Destination.wait_for_latch(stop)
end

#to_sObject



202
203
204
# File 'lib/torquebox/messaging/destination.rb', line 202

def to_s
  name
end

#wait_for_destination(timeout = nil, &block) ⇒ Object



162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/torquebox/messaging/destination.rb', line 162

def wait_for_destination(timeout=nil, &block)
  timeout ||= 30_000 # 30s default
  start = Time.now
  begin
    block.call
  rescue javax.naming.NameNotFoundException, javax.jms.JMSException
    elapsed = (Time.now - start) * 1000
    if elapsed > timeout
      raise
    else
      sleep(0.1)
      retry
    end
  end
end

#with_session(opts = {}) ⇒ Object



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/torquebox/messaging/destination.rb', line 146

def with_session(opts = {})
  transactional = opts.fetch(:tx, true)

  # https://issues.jboss.org/browse/TORQUE-1033
  # If there is no encoding for the message, set the default one
  # for the destination. If the encoding for destination isn't set
  # it won't hurt
  opts[:encoding] = @connect_options[:encoding] if opts[:encoding].nil?

  connection_factory.with_new_connection(connect_options, transactional) do |connection|
    connection.with_session do |session|
      yield session
    end
  end
end