Class: TorqueBox::Messaging::Queue

Inherits:
Destination show all
Defined in:
lib/torquebox/messaging/queue.rb

Constant Summary

Constants inherited from Destination

Destination::PRIORITY_MAP

Instance Attribute Summary

Attributes inherited from Destination

#connect_options, #connection_factory, #enumerable_options, #java_destination, #name

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Destination

#_dump, _load, #each, #initialize, list, lookup, #normalize_options, #publish, #receive, #stop, #stop_sync, #wait_for_destination, wait_for_latch, with_destinationizer, #with_session

Constructor Details

This class inherits a constructor from TorqueBox::Messaging::Destination

Class Method Details

.start(name, options = {}) ⇒ Queue?

Creates the queue, starts and return a Queue object.

Parameters:

  • The name of the queue

  • (defaults to: {})

    Optional parameters (a Hash), including:

Options Hash (options):

  • :selector (String)

    The selector for the queue

  • :durable (Boolean)

    If the queue should be durable

  • :exported (Boolean)

    If the queue should be visible in remote JNDI lookups

Returns:

  • if the service is created and started

  • if the service is not created in the specified time (30 s)



36
37
38
39
40
41
42
43
44
45
46
# File 'lib/torquebox/messaging/queue.rb', line 36

def start(name, options={})
  selector = options.fetch(:selector, "")
  durable = options.fetch(:durable, true)
  exported = options.fetch(:exported, false)

  with_destinationizer do |destinationizer|
    destinationizer.create_queue(name, durable, selector, exported)
  end

  new(name, options)
end

Instance Method Details

#consumer_countFixnum

Returns the consumer count connected to the queue.

Returns:

  • The number of consumers



189
190
191
192
193
# File 'lib/torquebox/messaging/queue.rb', line 189

def consumer_count
  with_queue_control do |control|
    control.consumer_count
  end
end

#count_messages(filter = nil) ⇒ Fixnum

Counts messages in the queue.

Examples:

Count messages with :type property set to ‘tomatoe’ or ‘garlic’


@queue.count_messages("type = 'tomatoe' OR type = 'garlic'")

Parameters:

  • (defaults to: nil)

    Expression in a SQL92-like syntax based on properties set on the messages. If not set all messages will be counted.

Returns:

  • The number of counted messages



140
141
142
143
144
# File 'lib/torquebox/messaging/queue.rb', line 140

def count_messages(filter = nil)
  with_queue_control do |control|
    control.count_messages(filter)
  end
end

#dead_letter_addressString

Returns current dead letter address.

Returns:

  • Current dead letter address. Please note that the destination contains ‘jms.queue’ or ‘jms.topic’ prefixes,



266
267
268
269
270
# File 'lib/torquebox/messaging/queue.rb', line 266

def dead_letter_address
  with_queue_control do |control|
    control.dead_letter_address
  end
end

#dead_letter_address=(address) ⇒ String

Sets the dead letter address.

Please note that you need to provide the *full address* containing the destination name and jms.queue or jms.topic prefixes, for example:

Examples:

Set the destination to /queues/customdead


@queue.dead_letter_address = "jms.queue./queues./customdead"

Set the destination to /topics/customdead


@queue.dead_letter_address = "jms.topic./topics./customdead"

Returns:

  • Current dead letter address



287
288
289
290
291
292
293
# File 'lib/torquebox/messaging/queue.rb', line 287

def dead_letter_address=(address)
  with_queue_control do |control|
    control.set_dead_letter_address(address)
  end

  dead_letter_address
end

#expire_message(id) ⇒ Boolean

Expires message from the queue by its id.

Returns:

  • Returns true if the message was expired, false otherwise.



160
161
162
163
164
# File 'lib/torquebox/messaging/queue.rb', line 160

def expire_message(id)
  with_queue_control do |control|
    control.expire_message(id)
  end
end

#expire_messages(filter = nil) ⇒ Fixnum

Expires messages from the queue.

Parameters:

  • (defaults to: nil)

    Expression in a SQL92-like syntax based on properties set on the messages. If not set all messages will be expired.

Returns:

  • The number of expired messages



151
152
153
154
155
# File 'lib/torquebox/messaging/queue.rb', line 151

def expire_messages(filter = nil)
  with_queue_control do |control|
    control.expire_messages(filter)
  end
end

#expiry_addressString

Returns current expiry address.

Returns:

  • Current expiry address. Please note that the destination contains ‘jms.queue’ or ‘jms.topic’ prefixes.



234
235
236
237
238
# File 'lib/torquebox/messaging/queue.rb', line 234

def expiry_address
  with_queue_control do |control|
    control.expiry_address
  end
end

#expiry_address=(address) ⇒ String

Sets the expiry address.

Please note that you need to provide the *full address* containing the destination name and jms.queue or jms.topic prefixes, for example:

Examples:

Set the destination to /queues/customexpire


@queue.expiry_address = "jms.queue./queues./customexpire"

Set the destination to /topics/customexpire


@queue.expiry_address = "jms.topic./topics./customexpire"

Returns:

  • Current expiry address



255
256
257
258
259
260
261
# File 'lib/torquebox/messaging/queue.rb', line 255

def expiry_address=(address)
  with_queue_control do |control|
    control.set_expiry_address(address)
  end

  expiry_address
end

#move_message(queue_name, id, reject_duplicates = false) ⇒ Boolean

Moves message for specific id from the queue to another queue specified in the queue_name parameter.

Parameters:

  • The name of the queue to move the messages to

  • Message ID

  • (defaults to: false)

    Specifies if the duplicates should be rejected

Returns:

  • true if the message was moved,false otherwise



225
226
227
228
229
# File 'lib/torquebox/messaging/queue.rb', line 225

def move_message(queue_name, id, reject_duplicates = false)
  with_queue_control do |control|
    control.move_message(id, queue_name, reject_duplicates)
  end
end

#move_messages(queue_name, filter = nil, reject_duplicates = false) ⇒ Fixnum

Moves messages from the queue to another queue specified in the queue_name parameter. Optional reject_duplicates parameter specifies if the duplicates should be rejected.

Parameters:

  • The name of the queue to move the messages to

  • (defaults to: nil)

    Parameter to limit messages to move. If provided nil or empty string, *all messages* will be moved.

  • (defaults to: false)

    Specifies if the duplicates should be rejected

Returns:

  • The number of moved messages



212
213
214
215
216
# File 'lib/torquebox/messaging/queue.rb', line 212

def move_messages(queue_name, filter = nil, reject_duplicates = false)
  with_queue_control do |control|
    control.move_messages(filter, queue_name, reject_duplicates)
  end
end

#pausevoid

This method returns an undefined value.

Pauses a queue.

Messages put into a queue will not be delivered even if there are connected consumers.

When executed on a paused queue, nothing happens.



90
91
92
93
94
# File 'lib/torquebox/messaging/queue.rb', line 90

def pause
  with_queue_control do |control|
    control.pause
  end
end

#paused?Boolean

Returns true if queue is paused, false otherwise.

Returns:



76
77
78
79
80
# File 'lib/torquebox/messaging/queue.rb', line 76

def paused?
  with_queue_control do |control|
    control.is_paused
  end
end

#publish_and_receive(message, options = {}) ⇒ Object

Publishes a message and waits for the reply

Parameters:

  • The message to publish

  • (defaults to: {})

    Optional parameters (a Hash)

Returns:

  • Replied message



54
55
56
57
58
59
60
61
# File 'lib/torquebox/messaging/queue.rb', line 54

def publish_and_receive(message, options={})
  result = nil
  with_session do |session|
    result = session.publish_and_receive(self, message,
                                         normalize_options(options))
  end
  result
end

#receive_and_publish(options = {}, &block) ⇒ void

This method returns an undefined value.

Waits for a message and replies

Parameters:

  • (defaults to: {})

    Optional parameters (a Hash)

  • The block to handle the received message. The return value of the block will be send back to the queue.



68
69
70
71
72
# File 'lib/torquebox/messaging/queue.rb', line 68

def receive_and_publish(options={}, &block)
  with_session do |session|
    session.receive_and_publish(self, normalize_options(options), &block)
  end
end

#remove_message(id) ⇒ Boolean

Removes message from the queue by its id.

Parameters:

  • ID of the message

Returns:

  • true if the message was removed, false otherwise.



125
126
127
128
129
# File 'lib/torquebox/messaging/queue.rb', line 125

def remove_message(id)
  with_queue_control do |control|
    control.remove_message(id)
  end
end

#remove_messages(filter = nil) ⇒ Integer

Removes messages from the queue.

Examples:

Remove messages with type property set


@queue.remove_messages("type = 'tomatoe' OR type = 'garlic'")

Parameters:

  • (defaults to: nil)

    Expression in a SQL92-like syntax based on properties set on the messages. If not set all messages will be removed.

Returns:

  • Number of removed messages



115
116
117
118
119
# File 'lib/torquebox/messaging/queue.rb', line 115

def remove_messages(filter = nil)
  with_queue_control do |control|
    control.remove_messages(filter)
  end
end

#resumevoid

This method returns an undefined value.

Resumes a queue after it was paused. When executed on a active queue, nothing happens.



100
101
102
103
104
# File 'lib/torquebox/messaging/queue.rb', line 100

def resume
  with_queue_control do |control|
    control.resume
  end
end

#scheduled_messages_countFixnum

Returns the scheduled messages count for this queue.

Returns:

  • The number of scheduled messages



198
199
200
201
202
# File 'lib/torquebox/messaging/queue.rb', line 198

def scheduled_messages_count
  with_queue_control do |control|
    control.scheduled_count
  end
end

#send_message_to_dead_letter_address(id) ⇒ Boolean

Sends message to dead letter address.

Returns:

  • Returns true if the message was sent, false otherwise.



169
170
171
172
173
# File 'lib/torquebox/messaging/queue.rb', line 169

def send_message_to_dead_letter_address(id)
  with_queue_control do |control|
    control.send_message_to_dead_letter_address(id)
  end
end

#send_messages_to_dead_letter_address(filter = nil) ⇒ Fixnum

Sends messages to dead letter address.

Parameters:

  • (defaults to: nil)

    Expression in a SQL92-like syntax based on properties set on the messages. If not set all messages will be send.

Returns:

  • The number of sent messages



180
181
182
183
184
# File 'lib/torquebox/messaging/queue.rb', line 180

def send_messages_to_dead_letter_address(filter = nil)
  with_queue_control do |control|
    control.send_messages_to_dead_letter_address(filter)
  end
end

#to_sObject



296
297
298
# File 'lib/torquebox/messaging/queue.rb', line 296

def to_s
  "[Queue: #{super}]"
end

#with_queue_controlObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Retrieves the JMSQueueControl implementation for current queue.

API:

  • private



304
305
306
307
308
# File 'lib/torquebox/messaging/queue.rb', line 304

def with_queue_control
  TorqueBox::ServiceRegistry.lookup("jboss.messaging.default") do |server|
    yield server.management_service.get_resource("jms.queue.#{@name}")
  end
end