Module: javaxjavax.jms::Session
- Defined in:
- lib/jms/javax_jms_session.rb
Overview
For each thread that will be processing messages concurrently a separate session is required. All sessions can share a single connection to the same JMS Provider.
Interface javax.jms.Session
See: download.oracle.com/javaee/6/api/javax/jms/Session.html
Other methods still directly accessible through this class:
create_browser(queue, message_selector)
Creates a QueueBrowser object to peek at the messages on the specified queue using a message selector.
create_bytes_message()
Creates a BytesMessage object
create_consumer(destination)
Creates a MessageConsumer for the specified destination
See: Connection::consumer
Example:
destination = session.create_destination(:queue_name => "MyQueue")
session.create_consumer(destination)
create_consumer(destination, message_selector)
Creates a MessageConsumer for the specified destination, using a message selector
create_consumer(destination, message_selector, boolean NoLocal)
Creates MessageConsumer for the specified destination, using a message selector
create_durable_subscriber(Topic topic, java.lang.String name)
Creates a durable subscriber to the specified topic
create_durable_subscriber(Topic topic, java.lang.String name, java.lang.String messageSelector, boolean noLocal)
Creates a durable subscriber to the specified topic, using a message selector and specifying whether messages published by its own connection should be delivered to it.
create_map_Message()
Creates a MapMessage object
create_message()
Creates a Message object
create_object_message()
Creates an ObjectMessage object
create_object_message(java.io.Serializable object)
Creates an initialized ObjectMessage object
create_producer(destination)
Creates a MessageProducer to send to the specified destination
create_queue(queue_name)
Creates a queue identity given a Queue name
create_stream_message()
Creates a StreamMessage object
create_temporary_queue()
Creates a TemporaryQueue object
create_temporary_topic()
Creates a TemporaryTopic object
create_text_message()
Creates a TextMessage object
create_text_message(text)
Creates an initialized TextMessage object
create_topic(topic_name)
Creates a topic identity given a Topic name
acknowledge_mode()
Returns the acknowledgement mode of the session
message_listener()
Returns the session's distinguished message listener (optional).
transacted?
Indicates whether the session is in transacted mode
recover()
Stops message delivery in this session, and restarts message delivery with the oldest unacknowledged message
rollback()
Rolls back any messages done in this transaction and releases any locks currently held
message_listener=(MessageListener listener)
Sets the session's distinguished message listener (optional)
unsubscribe(name)
Unsubscribes a durable subscription that has been created by a client
Instance Method Summary collapse
-
#browse(params = {}, &proc) ⇒ Object
Browse the specified queue, calling the Proc supplied for each message found.
-
#browser(params, &proc) ⇒ Object
Return a browser for the destination A browser can read messages non-destructively from the queue It cannot browse Topics!.
-
#consume(params, &proc) ⇒ Object
Consume all messages for the destination A consumer can read messages from the queue or topic.
-
#consumer(params, &proc) ⇒ Object
Return a consumer for the destination A consumer can read messages from the queue or topic.
-
#create_destination(params) ⇒ Object
Create the destination based on the parameter supplied.
-
#destination(params = {}, &block) ⇒ Object
Create a queue or topic to send or receive messages from.
-
#message(data) ⇒ Object
Create a new message instance based on the type of the data being supplied String (:to_str) => TextMessage Hash (:each_pair) => MapMessage Duck typing is used to determine the type.
-
#producer(params, &proc) ⇒ Object
Return a producer for the queue name supplied A producer supports sending messages to a Queue or a Topic.
-
#queue(queue_name, &block) ⇒ Object
Return the queue matching the queue name supplied Call the Proc if supplied.
-
#temporary_queue(&block) ⇒ Object
Return a temporary queue The temporary queue is deleted once the block completes If no block is supplied then it should be deleted by the caller when no longer needed.
-
#temporary_topic(&block) ⇒ Object
Return a temporary topic The temporary topic is deleted once the block completes If no block is supplied then it should be deleted by the caller when no longer needed.
-
#topic(topic_name, &proc) ⇒ Object
Return the topic matching the topic name supplied Call the Proc if supplied.
Instance Method Details
#browse(params = {}, &proc) ⇒ Object
Browse the specified queue, calling the Proc supplied for each message found
Parameters:
:queue_name => String: Name of the Queue to return
Symbol: :temporary => Create temporary queue
Mandatory unless :topic_name is supplied
Or,
:destination=> Explicit javaxJms::Destination to use
:selector => Filter which messages should be returned from the queue
Default: All messages
439 440 441 |
# File 'lib/jms/javax_jms_session.rb', line 439 def browse(params={}, &proc) self.browser(params) {|b| b.each(params, &proc)} end |
#browser(params, &proc) ⇒ Object
Return a browser for the destination A browser can read messages non-destructively from the queue It cannot browse Topics!
Call the Proc if supplied, then automatically close the consumer
Parameters:
:queue_name => String: Name of the Queue to return
Symbol: :temporary => Create temporary queue
Mandatory unless :topic_name is supplied
Or,
:destination=> Explicit javaxJms::Destination to use
:selector => Filter which messages should be returned from the queue
Default: All messages
406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 |
# File 'lib/jms/javax_jms_session.rb', line 406 def browser(params, &proc) raise "Session::browser requires a code block to be executed" unless proc destination = create_destination(params) b = nil if params[:selector] b = create_browser(destination, params[:selector]) else b = create_browser(destination) end if proc begin proc.call(b) ensure b.close b = nil end end b end |
#consume(params, &proc) ⇒ Object
Consume all messages for the destination A consumer can read messages from the queue or topic
Parameters:
:queue_name => String: Name of the Queue to return
Symbol: :temporary => Create temporary queue
Mandatory unless :topic_name is supplied
Or,
:topic_name => String: Name of the Topic to write to or subscribe to
Symbol: :temporary => Create temporary topic
Mandatory unless :queue_name is supplied
Or,
:destination=> Explicit javaxJms::Destination to use
:selector => Filter which messages should be returned from the queue
Default: All messages
:no_local => Determine whether messages published by its own connection
should be delivered to it
Default: false
:timeout Follows the rules for MQSeries:
-1 : Wait forever
0 : Return immediately if no message is available
x : Wait for x milli-seconds for a message to be received from the broker
Note: Messages may still be on the queue, but the broker has not supplied any messages
in the time interval specified
Default: 0
382 383 384 385 386 387 388 389 |
# File 'lib/jms/javax_jms_session.rb', line 382 def consume(params, &proc) c = self.consumer(params) begin c.each(params, &proc) ensure c.close end end |
#consumer(params, &proc) ⇒ Object
Return a consumer for the destination A consumer can read messages from the queue or topic
Call the Proc if supplied, then automatically close the consumer
Parameters:
:queue_name => String: Name of the Queue to return
Symbol: :temporary => Create temporary queue
Mandatory unless :topic_name is supplied
Or,
:topic_name => String: Name of the Topic to write to or subscribe to
Symbol: :temporary => Create temporary topic
Mandatory unless :queue_name is supplied
Or,
:destination=> Explicit javaxJms::Destination to use
:selector => Filter which messages should be returned from the queue
Default: All messages
:no_local => Determine whether messages published by its own connection
should be delivered to it
Default: false
332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 |
# File 'lib/jms/javax_jms_session.rb', line 332 def consumer(params, &proc) destination = create_destination(params) c = nil if params[:no_local] c = create_consumer(destination, params[:selector] || '', params[:no_local]) elsif params[:selector] c = create_consumer(destination, params[:selector]) else c = create_consumer(destination) end if proc begin proc.call(c) ensure c.close c = nil end end c end |
#create_destination(params) ⇒ Object
Create the destination based on the parameter supplied
The idea behind this method is to allow the decision as to whether one is sending to a topic or destination to be transparent to the code. The supplied parameters can be externalized into say a YAML file so that today it writes to a queue, later it can be changed to write to a topic so that multiple parties can receive the same messages.
Note: For Temporary Queues and Topics, remember to delete them when done
or just use ::destination instead with a block and it will take care
of deleting them for you
To create a queue:
session.create_destination(:queue_name => 'name of queue')
To create a temporary queue:
session.create_destination(:queue_name => :temporary)
To create a topic:
session.create_destination(:topic_name => 'name of queue')
To create a temporary topic:
session.create_destination(:topic_name => :temporary)
Create the destination based on the parameter supplied
Parameters:
:queue_name => String: Name of the Queue to return
Symbol: :temporary => Create temporary queue
Mandatory unless :topic_name is supplied
Or,
:topic_name => String: Name of the Topic to write to or subscribe to
Symbol: :temporary => Create temporary topic
Mandatory unless :queue_name is supplied
Or,
:destination=> Explicit javaxJms::Destination to use
Returns the result of the supplied block
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 |
# File 'lib/jms/javax_jms_session.rb', line 169 def create_destination(params) # Allow a Java JMS destination object to be passed in return params[:destination] if params[:destination] && params[:destination].java_kind_of?(javax.jms::Destination) # :queue_name is deprecated queue_name = params[:queue_name] || params[:queue_name] topic_name = params[:topic_name] raise "Missing mandatory parameter :queue_name or :topic_name to Session::producer, Session::consumer, or Session::browser" unless queue_name || topic_name if queue_name queue_name == :temporary ? create_temporary_queue : create_queue(queue_name) else topic_name == :temporary ? create_temporary_topic : create_topic(topic_name) end end |
#destination(params = {}, &block) ⇒ Object
Create a queue or topic to send or receive messages from
A block must be supplied so that if it is a temporary topic or queue it will be deleted after the proc is complete
To create a queue:
session.destination(:queue_name => 'name of queue')
To create a temporary queue:
session.destination(:queue_name => :temporary)
To create a topic:
session.destination(:topic_name => 'name of queue')
To create a temporary topic:
session.destination(:topic_name => :temporary)
Create the destination based on the parameter supplied
Parameters:
:queue_name => String: Name of the Queue to return
Symbol: :temporary => Create temporary queue
Mandatory unless :topic_name is supplied
Or,
:topic_name => String: Name of the Topic to write to or subscribe to
Symbol: :temporary => Create temporary topic
Mandatory unless :queue_name is supplied
Or,
:destination=> Explicit javaxJms::Destination to use
Returns the result of the supplied block
216 217 218 219 220 221 222 223 224 225 226 |
# File 'lib/jms/javax_jms_session.rb', line 216 def destination(params={}, &block) raise "Missing mandatory Block when calling JMS::Session#destination" unless block dest = nil begin dest = create_destination(params) block.call(dest) ensure # Delete Temporary Queue / Topic dest.delete if dest && dest.respond_to?(:delete) end end |
#message(data) ⇒ Object
Create a new message instance based on the type of the data being supplied
String (:to_str) => TextMessage
Hash (:each_pair) => MapMessage
Duck typing is used to determine the type. If the class responds to :to_str then it is considered a String. Similarly if it responds to :each_pair it is considered to be a Hash
117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/jms/javax_jms_session.rb', line 117 def (data) = nil if data.respond_to?(:to_str, false) = self.createTextMessage .text = data.to_str elsif data.respond_to?(:each_pair, false) = self.createMapMessage .data = data else raise "Unknown data type #{data.class.to_s} in Message" end end |
#producer(params, &proc) ⇒ Object
Return a producer for the queue name supplied A producer supports sending messages to a Queue or a Topic
Call the Proc if supplied, then automatically close the producer
Parameters:
:queue_name => String: Name of the Queue to return
Symbol: :temporary => Create temporary queue
Mandatory unless :topic_name is supplied
Or,
:topic_name => String: Name of the Topic to write to or subscribe to
Symbol: :temporary => Create temporary topic
Mandatory unless :queue_name is supplied
Or,
:destination=> Explicit javax.jms::Destination to use
295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 |
# File 'lib/jms/javax_jms_session.rb', line 295 def producer(params, &proc) destination = create_destination(params) # Call original java method with this destination #p = java_send :create_producer, [javax.jms::Destination], destination p = create_producer(destination) if proc begin proc.call(p) ensure p.close p = nil end end p end |
#queue(queue_name, &block) ⇒ Object
Return the queue matching the queue name supplied Call the Proc if supplied
230 231 232 233 234 |
# File 'lib/jms/javax_jms_session.rb', line 230 def queue(queue_name, &block) q = create_queue(queue_name) block.call(q) if block q end |
#temporary_queue(&block) ⇒ Object
Return a temporary queue The temporary queue is deleted once the block completes If no block is supplied then it should be deleted by the caller when no longer needed
240 241 242 243 244 245 246 247 248 249 250 251 252 |
# File 'lib/jms/javax_jms_session.rb', line 240 def temporary_queue(&block) q = create_temporary_queue if block begin block.call(q) ensure # Delete Temporary queue on completion of block q.delete if q q = nil end end q end |
#temporary_topic(&block) ⇒ Object
Return a temporary topic The temporary topic is deleted once the block completes If no block is supplied then it should be deleted by the caller when no longer needed
266 267 268 269 270 271 272 273 274 275 276 277 278 |
# File 'lib/jms/javax_jms_session.rb', line 266 def temporary_topic(&block) t = create_temporary_topic if block begin block.call(t) ensure # Delete Temporary topic on completion of block t.delete if t t = nil end end t end |
#topic(topic_name, &proc) ⇒ Object
Return the topic matching the topic name supplied Call the Proc if supplied
256 257 258 259 260 |
# File 'lib/jms/javax_jms_session.rb', line 256 def topic(topic_name, &proc) t = create_topic(topic_name) proc.call(t) if proc t end |