Class: TorqueBox::Messaging::Queue

Inherits:
Destination show all
Defined in:
lib/torquebox/messaging/queue.rb

Constant Summary

Constants inherited from Destination

Destination::PRIORITY_MAP

Instance Attribute Summary

Attributes inherited from Destination

#connect_options, #connection_factory, #enumerable_options, #java_destination, #name

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Destination

#_dump, _load, #each, #initialize, list, lookup, #normalize_options, #publish, #receive, #stop, #stop_sync, #wait_for_destination, wait_for_latch, with_destinationizer, #with_session

Constructor Details

This class inherits a constructor from TorqueBox::Messaging::Destination

Class Method Details

.start(name, options = {}) ⇒ Queue?

Creates the queue, starts and return a Queue object.

Parameters:

  • name

    The name of the queue

  • options (defaults to: {})

    Optional parameters (a Hash), including:

Options Hash (options):

  • :selector (String)

    The selector for the queue

  • :durable (Boolean)

    If the queue should be durable

  • :exported (Boolean)

    If the queue should be visible in remote JNDI lookups

Returns:

  • (Queue)

    if the service is created and started

  • (nil)

    if the service is not created in the specified time (30 s)



36
37
38
39
40
41
42
43
44
45
46
# File 'lib/torquebox/messaging/queue.rb', line 36

def start(name, options={})
  selector = options.fetch(:selector, "")
  durable = options.fetch(:durable, true)
  exported = options.fetch(:exported, false)

  with_destinationizer do |destinationizer|
    destinationizer.create_queue(name, durable, selector, exported)
  end

  new(name, options)
end

Instance Method Details

#consumer_countFixnum

Returns the consumer count connected to the queue.

Returns:

  • (Fixnum)

    The number of consumers



189
190
191
192
193
# File 'lib/torquebox/messaging/queue.rb', line 189

def consumer_count
  with_queue_control do |control|
    control.consumer_count
  end
end

#count_messages(filter = nil) ⇒ Fixnum

Counts messages in the queue.

Examples:

Count messages with :type property set to ‘tomatoe’ or ‘garlic’


@queue.count_messages("type = 'tomatoe' OR type = 'garlic'")

Parameters:

  • filter (String) (defaults to: nil)

    Expression in a SQL92-like syntax based on properties set on the messages. If not set all messages will be counted.

Returns:

  • (Fixnum)

    The number of counted messages



140
141
142
143
144
# File 'lib/torquebox/messaging/queue.rb', line 140

def count_messages(filter = nil)
  with_queue_control do |control|
    control.count_messages(filter)
  end
end

#dead_letter_addressString

Returns current dead letter address.

Returns:

  • (String)

    Current dead letter address. Please note that the destination contains ‘jms.queue’ or ‘jms.topic’ prefixes,



266
267
268
269
270
# File 'lib/torquebox/messaging/queue.rb', line 266

def dead_letter_address
  with_queue_control do |control|
    control.dead_letter_address
  end
end

#dead_letter_address=(address) ⇒ String

Sets the dead letter address.

Please note that you need to provide the *full address* containing the destination name and jms.queue or jms.topic prefixes, for example:

Examples:

Set the destination to /queues/customdead


@queue.dead_letter_address = "jms.queue./queues./customdead"

Set the destination to /topics/customdead


@queue.dead_letter_address = "jms.topic./topics./customdead"

Returns:

  • (String)

    Current dead letter address



287
288
289
290
291
292
293
# File 'lib/torquebox/messaging/queue.rb', line 287

def dead_letter_address=(address)
  with_queue_control do |control|
    control.set_dead_letter_address(address)
  end

  dead_letter_address
end

#expire_message(id) ⇒ Boolean

Expires message from the queue by its id.

Returns:

  • (Boolean)

    Returns true if the message was expired, false otherwise.



160
161
162
163
164
# File 'lib/torquebox/messaging/queue.rb', line 160

def expire_message(id)
  with_queue_control do |control|
    control.expire_message(id)
  end
end

#expire_messages(filter = nil) ⇒ Fixnum

Expires messages from the queue.

Parameters:

  • filter (String) (defaults to: nil)

    Expression in a SQL92-like syntax based on properties set on the messages. If not set all messages will be expired.

Returns:

  • (Fixnum)

    The number of expired messages



151
152
153
154
155
# File 'lib/torquebox/messaging/queue.rb', line 151

def expire_messages(filter = nil)
  with_queue_control do |control|
    control.expire_messages(filter)
  end
end

#expiry_addressString

Returns current expiry address.

Returns:

  • (String)

    Current expiry address. Please note that the destination contains ‘jms.queue’ or ‘jms.topic’ prefixes.



234
235
236
237
238
# File 'lib/torquebox/messaging/queue.rb', line 234

def expiry_address
  with_queue_control do |control|
    control.expiry_address
  end
end

#expiry_address=(address) ⇒ String

Sets the expiry address.

Please note that you need to provide the *full address* containing the destination name and jms.queue or jms.topic prefixes, for example:

Examples:

Set the destination to /queues/customexpire


@queue.expiry_address = "jms.queue./queues./customexpire"

Set the destination to /topics/customexpire


@queue.expiry_address = "jms.topic./topics./customexpire"

Returns:

  • (String)

    Current expiry address



255
256
257
258
259
260
261
# File 'lib/torquebox/messaging/queue.rb', line 255

def expiry_address=(address)
  with_queue_control do |control|
    control.set_expiry_address(address)
  end

  expiry_address
end

#move_message(queue_name, id, reject_duplicates = false) ⇒ Boolean

Moves message for specific id from the queue to another queue specified in the queue_name parameter.

Parameters:

  • queue_name (String)

    The name of the queue to move the messages to

  • id (String)

    Message ID

  • reject_duplicates (Boolean) (defaults to: false)

    Specifies if the duplicates should be rejected

Returns:

  • (Boolean)

    true if the message was moved,false otherwise



225
226
227
228
229
# File 'lib/torquebox/messaging/queue.rb', line 225

def move_message(queue_name, id, reject_duplicates = false)
  with_queue_control do |control|
    control.move_message(id, queue_name, reject_duplicates)
  end
end

#move_messages(queue_name, filter = nil, reject_duplicates = false) ⇒ Fixnum

Moves messages from the queue to another queue specified in the queue_name parameter. Optional reject_duplicates parameter specifies if the duplicates should be rejected.

Parameters:

  • queue_name (String)

    The name of the queue to move the messages to

  • filter (String) (defaults to: nil)

    Parameter to limit messages to move. If provided nil or empty string, *all messages* will be moved.

  • reject_duplicates (Boolean) (defaults to: false)

    Specifies if the duplicates should be rejected

Returns:

  • (Fixnum)

    The number of moved messages



212
213
214
215
216
# File 'lib/torquebox/messaging/queue.rb', line 212

def move_messages(queue_name, filter = nil, reject_duplicates = false)
  with_queue_control do |control|
    control.move_messages(filter, queue_name, reject_duplicates)
  end
end

#pausevoid

This method returns an undefined value.

Pauses a queue.

Messages put into a queue will not be delivered even if there are connected consumers.

When executed on a paused queue, nothing happens.



90
91
92
93
94
# File 'lib/torquebox/messaging/queue.rb', line 90

def pause
  with_queue_control do |control|
    control.pause
  end
end

#paused?Boolean

Returns true if queue is paused, false otherwise.

Returns:

  • (Boolean)


76
77
78
79
80
# File 'lib/torquebox/messaging/queue.rb', line 76

def paused?
  with_queue_control do |control|
    control.is_paused
  end
end

#publish_and_receive(message, options = {}) ⇒ Object

Publishes a message and waits for the reply

Parameters:

  • message

    The message to publish

  • options (defaults to: {})

    Optional parameters (a Hash)

Returns:

  • Replied message



54
55
56
57
58
59
60
61
# File 'lib/torquebox/messaging/queue.rb', line 54

def publish_and_receive(message, options={})
  result = nil
  with_session do |session|
    result = session.publish_and_receive(self, message,
                                         normalize_options(options))
  end
  result
end

#receive_and_publish(options = {}, &block) ⇒ void

This method returns an undefined value.

Waits for a message and replies

Parameters:

  • options (defaults to: {})

    Optional parameters (a Hash)

  • block

    The block to handle the received message. The return value of the block will be send back to the queue.



68
69
70
71
72
# File 'lib/torquebox/messaging/queue.rb', line 68

def receive_and_publish(options={}, &block)
  with_session do |session|
    session.receive_and_publish(self, normalize_options(options), &block)
  end
end

#remove_message(id) ⇒ Boolean

Removes message from the queue by its id.

Parameters:

  • id (String)

    ID of the message

Returns:

  • (Boolean)

    true if the message was removed, false otherwise.



125
126
127
128
129
# File 'lib/torquebox/messaging/queue.rb', line 125

def remove_message(id)
  with_queue_control do |control|
    control.remove_message(id)
  end
end

#remove_messages(filter = nil) ⇒ Integer

Removes messages from the queue.

Examples:

Remove messages with type property set


@queue.remove_messages("type = 'tomatoe' OR type = 'garlic'")

Parameters:

  • filter (String) (defaults to: nil)

    Expression in a SQL92-like syntax based on properties set on the messages. If not set all messages will be removed.

Returns:

  • (Integer)

    Number of removed messages



115
116
117
118
119
# File 'lib/torquebox/messaging/queue.rb', line 115

def remove_messages(filter = nil)
  with_queue_control do |control|
    control.remove_messages(filter)
  end
end

#resumevoid

This method returns an undefined value.

Resumes a queue after it was paused. When executed on a active queue, nothing happens.



100
101
102
103
104
# File 'lib/torquebox/messaging/queue.rb', line 100

def resume
  with_queue_control do |control|
    control.resume
  end
end

#scheduled_messages_countFixnum

Returns the scheduled messages count for this queue.

Returns:

  • (Fixnum)

    The number of scheduled messages



198
199
200
201
202
# File 'lib/torquebox/messaging/queue.rb', line 198

def scheduled_messages_count
  with_queue_control do |control|
    control.scheduled_count
  end
end

#send_message_to_dead_letter_address(id) ⇒ Boolean

Sends message to dead letter address.

Returns:

  • (Boolean)

    Returns true if the message was sent, false otherwise.



169
170
171
172
173
# File 'lib/torquebox/messaging/queue.rb', line 169

def send_message_to_dead_letter_address(id)
  with_queue_control do |control|
    control.send_message_to_dead_letter_address(id)
  end
end

#send_messages_to_dead_letter_address(filter = nil) ⇒ Fixnum

Sends messages to dead letter address.

Parameters:

  • filter (String) (defaults to: nil)

    Expression in a SQL92-like syntax based on properties set on the messages. If not set all messages will be send.

Returns:

  • (Fixnum)

    The number of sent messages



180
181
182
183
184
# File 'lib/torquebox/messaging/queue.rb', line 180

def send_messages_to_dead_letter_address(filter = nil)
  with_queue_control do |control|
    control.send_messages_to_dead_letter_address(filter)
  end
end

#to_sObject



296
297
298
# File 'lib/torquebox/messaging/queue.rb', line 296

def to_s
  "[Queue: #{super}]"
end

#with_queue_controlObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Retrieves the JMSQueueControl implementation for current queue.



304
305
306
307
308
# File 'lib/torquebox/messaging/queue.rb', line 304

def with_queue_control
  TorqueBox::ServiceRegistry.lookup("jboss.messaging.default") do |server|
    yield server.management_service.get_resource("jms.queue.#{@name}")
  end
end