Class: JMS::Connection
- Inherits:
-
Object
- Object
- JMS::Connection
- Defined in:
- lib/jms/connection.rb
Overview
Every JMS session must have at least one Connection instance A Connection instance represents a connection between this client application and the JMS Provider (server/queue manager/broker). A connection is distinct from a Session, in that multiple Sessions can share a single connection. Also, unit of work control (commit/rollback) is performed at the Session level.
Since many JRuby applications will only have one connection and one session several convenience methods have been added to support creating both the Session and Connection objects automatically.
For Example, to read all messages from a queue and then terminate:
require 'rubygems'
require 'jms'
JMS::Connection.create_session(
:factory => 'org.apache.activemq.ActiveMQConnectionFactory',
:broker_url => 'tcp://localhost:61616',
:require_jars => [
'~/Applications/apache-activemq-5.5.0/activemq-all-5.5.0.jar',
'~/Applications/apache-activemq-5.5.0/lib/optional/slf4j-log4j12-1.5.11.jar',
'~/Applications/apache-activemq-5.5.0/lib/optional/log4j-1.2.14.jar',
]
) do |session|
session.consumer(:queue_name=>'TEST') do |consumer|
if = consumer.receive_no_wait
puts "Data Received: #{.data}"
else
puts 'No message available'
end
end
end
The above code creates a Connection and then a Session. Once the block completes the session is closed and the Connection disconnected.
See: download.oracle.com/javaee/6/api/javax/jms/Connection.html
Class Method Summary collapse
-
.session(params = {}, &proc) ⇒ Object
Connect to a JMS Broker, create and start the session, then call the code block passing in the session.
-
.start(params = {}, &proc) ⇒ Object
Create a connection to the JMS provider, start the connection, call the supplied code block, then close the connection upon completion.
Instance Method Summary collapse
-
#client_id ⇒ Object
Gets the client identifier for this connection.
-
#client_id=(client_id) ⇒ Object
Sets the client identifier for this connection.
-
#close ⇒ Object
Close connection with the JMS Provider First close any consumers or sessions that are active as a result of JMS::Connection::on_message.
-
#create_session(params = {}) ⇒ Object
Create a session over this connection.
-
#create_session_pool(params = {}) ⇒ Object
Since a Session can only be used by one thread at a time, we could create a Session for every thread.
-
#exception_listener ⇒ Object
Returns the ExceptionListener object for this connection Returned class implements interface JMS::ExceptionListener.
-
#exception_listener=(listener) ⇒ Object
Sets an exception listener for this connection See ::on_exception to set a Ruby Listener Returns: nil.
-
#fetch_dependencies(jar_list) ⇒ Object
Load the required jar files for this JMS Provider and load JRuby extensions for those classes.
-
#initialize(params = {}) ⇒ Connection
constructor
Create a connection to the JMS provider.
-
#meta_data ⇒ Object
Gets the metadata for this connection see: download.oracle.com/javaee/6/api/javax/jms/ConnectionMetaData.html.
-
#on_exception(&block) ⇒ Object
Whenever an exception occurs the supplied block is called This is important when Connection::on_message has been used, since failures to the connection would be lost otherwise.
-
#on_message(params, &block) ⇒ Object
Receive messages in a separate thread when they arrive.
-
#on_message_statistics ⇒ Object
Return the statistics for every active Connection#on_message consumer in an Array.
-
#session(params = {}, &proc) ⇒ Object
Create a session over this connection.
-
#start ⇒ Object
Start (or restart) delivery of incoming messages over this connection.
-
#stop ⇒ Object
Temporarily stop delivery of incoming messages on this connection Useful during a hot code update or other changes that need to be completed without any new messages being processed Call start() to resume receiving messages.
-
#to_s ⇒ Object
Return a string describing the JMS provider and version.
Constructor Details
#initialize(params = {}) ⇒ Connection
Create a connection to the JMS provider
Note: Connection::start must be called before any consumers will be
able to receive
In JMS we need to start by obtaining the JMS Factory class that is supplied by the JMS Vendor.
There are 3 ways to establish a connection to a JMS Provider
1. Supply the name of the JMS Providers Factory Class
2. Supply an instance of the JMS Provider class itself
3. Use a JNDI lookup to return the JMS Provider Factory class
Parameters:
:factory => String: Name of JMS Provider Factory class
=> Class: JMS Provider Factory class itself
:jndi_name => String: Name of JNDI entry at which the Factory can be found
:jndi_context => Mandatory if jndi lookup is being used, contains details
on how to connect to JNDI server etc.
:require_jars => An optional array of Jar file names to load for the specified
JMS provider. By using this option it is not necessary
to put all the JMS Provider specific jar files into the
environment variable CLASSPATH prior to starting JRuby
:username => Username to connect to JMS provider with
:password => Password to use when to connecting to the JMS provider
Note: :password is ignored if :username is not supplied
:factory and :jndi_name are mutually exclusive, both cannot be supplied at the same time. :factory takes precedence over :jndi_name
JMS Provider specific properties can be set if the JMS Factory itself has setters for those properties.
For some known examples, see: [Example jms.yml](github.com/reidmorrison/jruby-jms/blob/master/examples/jms.yml)
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 |
# File 'lib/jms/connection.rb', line 167 def initialize(params = {}) # Used by ::on_message @sessions = [] @consumers = [] = params.dup # Load Jar files on demand so that they do not need to be in the CLASSPATH # of JRuby lib directory fetch_dependencies(.delete(:require_jars)) connection_factory = nil factory = .delete(:factory) if factory # If factory is a string, then it is the name of a class, not the class itself factory = eval(factory) if factory.respond_to? :to_str connection_factory = factory.new elsif jndi_name = [:jndi_name] raise "Missing mandatory parameter :jndi_context missing in call to Connection::connect" unless jndi_context = [:jndi_context] jndi = javax.naming.InitialContext.new(java.util.Hashtable.new(jndi_context)) begin connection_factory = jndi.lookup jndi_name ensure jndi.close end else raise "Missing mandatory parameter :factory or :jndi_name missing in call to Connection::connect" end .delete(:jndi_name) .delete(:jndi_context) JMS::logger.debug "Using Factory: #{connection_factory.java_class}" if connection_factory.respond_to? :java_class .each_pair do |key, val| next if [:username, :password].include?(key) method = key.to_s+'=' if connection_factory.respond_to? method connection_factory.send method, val JMS::logger.debug " #{key} = #{connection_factory.send key.to_sym}" if connection_factory.respond_to? key.to_sym else JMS::logger.warn "#{connection_factory.java_class} does not understand option: :#{key}=#{val}, ignoring :#{key}" if connection_factory.respond_to? :java_class end end # Check for username and password if [:username] @jms_connection = connection_factory.create_connection([:username], [:password]) else @jms_connection = connection_factory.create_connection end end |
Class Method Details
.session(params = {}, &proc) ⇒ Object
Connect to a JMS Broker, create and start the session, then call the code block passing in the session. Both the Session and Connection are closed on termination of the block
Shortcut convenience method to both connect to the broker and create a session Useful when only a single session is required in the current thread
Note: It is important that each thread have its own session to support transactions
This method will also start the session immediately so that any
consumers using this session will start immediately
83 84 85 86 87 |
# File 'lib/jms/connection.rb', line 83 def self.session(params = {}, &proc) self.start(params) do |connection| connection.session(params, &proc) end end |
.start(params = {}, &proc) ⇒ Object
Create a connection to the JMS provider, start the connection, call the supplied code block, then close the connection upon completion
Returns the result of the supplied block
62 63 64 65 66 67 68 69 70 71 |
# File 'lib/jms/connection.rb', line 62 def self.start(params = {}, &proc) raise "Missing mandatory Block when calling JMS::Connection.start" unless proc connection = Connection.new(params) connection.start begin proc.call(connection) ensure connection.close end end |
Instance Method Details
#client_id ⇒ Object
Gets the client identifier for this connection.
326 327 328 |
# File 'lib/jms/connection.rb', line 326 def client_id @jms_connection.getClientID end |
#client_id=(client_id) ⇒ Object
Sets the client identifier for this connection.
331 332 333 |
# File 'lib/jms/connection.rb', line 331 def client_id=(client_id) @jms_connection.setClientID(client_id) end |
#close ⇒ Object
Close connection with the JMS Provider First close any consumers or sessions that are active as a result of JMS::Connection::on_message
315 316 317 318 319 320 321 322 323 |
# File 'lib/jms/connection.rb', line 315 def close @consumers.each {|consumer| consumer.close } if @consumers @consumers = [] @sessions.each {|session| session.close} if @sessions @session=[] @jms_connection.close if @jms_connection end |
#create_session(params = {}) ⇒ Object
Create a session over this connection. It is recommended to create separate sessions for each thread
Note: Remember to call close on the returned session when it is no longer
needed. Rather use JMS::Connection#session with a block whenever
possible
Parameters:
:transacted => true or false
Determines whether transactions are supported within this session.
I.e. Whether commit or rollback can be called
Default: false
Note: :options below are ignored if this value is set to :true
:options => any of the JMS::Session constants:
Note: :options are ignored if :transacted => true
JMS::Session::AUTO_ACKNOWLEDGE
With this acknowledgment mode, the session automatically acknowledges
a client's receipt of a message either when the session has successfully
returned from a call to receive or when the message listener the session has
called to process the message successfully returns.
JMS::Session::CLIENT_ACKNOWLEDGE
With this acknowledgment mode, the client acknowledges a consumed
message by calling the message's acknowledge method.
JMS::Session::DUPS_OK_ACKNOWLEDGE
This acknowledgment mode instructs the session to lazily acknowledge
the delivery of messages.
JMS::Session::SESSION_TRANSACTED
This value is returned from the method getAcknowledgeMode if the
session is transacted.
Default: JMS::Session::AUTO_ACKNOWLEDGE
307 308 309 310 311 |
# File 'lib/jms/connection.rb', line 307 def create_session(params={}) transacted = params[:transacted] || false = params[:options] || JMS::Session::AUTO_ACKNOWLEDGE @jms_connection.create_session(transacted, ) end |
#create_session_pool(params = {}) ⇒ Object
Since a Session can only be used by one thread at a time, we could create a Session for every thread. That could result in excessive unused Sessions. An alternative is to create a pool of sessions that can be shared by multiple threads.
Each thread can request a session and then return it once it is no longer needed by that thread. The only way to get a session is to pass a block so that the Session is automatically returned to the pool upon completion of the block.
Parameters:
see regular session parameters from: JMS::Connection#initialize
Additional parameters for controlling the session pool itself
:pool_name Name of the pool as it shows up in the logger.
Default: 'JMS::SessionPool'
:pool_size Maximum Pool Size. Default: 10
The pool only grows as needed and will never exceed
:pool_size
:pool_warn_timeout Number of seconds to wait before logging a warning when a
session in the pool is not available. Measured in seconds
Default: 5.0
:pool_logger Supply a logger that responds to #debug, #info, #warn and #debug?
For example: Rails.logger
Default: None
Example:
session_pool = connection.create_session_pool(config)
session_pool.session do |session|
producer.send(session.("Hello World"))
end
514 515 516 517 |
# File 'lib/jms/connection.rb', line 514 def create_session_pool(params={}) require 'jms/session_pool' unless defined? JMS::SessionPool JMS::SessionPool.new(self, params) end |
#exception_listener ⇒ Object
Returns the ExceptionListener object for this connection Returned class implements interface JMS::ExceptionListener
337 338 339 |
# File 'lib/jms/connection.rb', line 337 def exception_listener @jms_connection.getExceptionListener end |
#exception_listener=(listener) ⇒ Object
Sets an exception listener for this connection See ::on_exception to set a Ruby Listener Returns: nil
344 345 346 |
# File 'lib/jms/connection.rb', line 344 def exception_listener=(listener) @jms_connection.setExceptionListener(listener) end |
#fetch_dependencies(jar_list) ⇒ Object
Load the required jar files for this JMS Provider and load JRuby extensions for those classes
Rather than copying the JMS jar files into the JRuby lib, load them on demand. JRuby JMS extensions are only loaded once the jar files have been loaded.
Can be called multiple times if required, although it would not be performant to do so regularly.
Parameter: jar_list is an Array of the path and filenames to jar files
to load for this JMS Provider
Returns nil
TODO make this a class method
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/jms/connection.rb', line 105 def fetch_dependencies(jar_list) jar_list.each do |jar| JMS::logger.debug "Loading Jar File:#{jar}" begin require jar rescue Exception => exc JMS::logger.error "Failed to Load Jar File:#{jar}. #{exc.to_s}" end end if jar_list require 'jms/mq_workaround' require 'jms/imports' require 'jms/message_listener_impl' require 'jms/message' require 'jms/text_message' require 'jms/map_message' require 'jms/bytes_message' require 'jms/object_message' require 'jms/session' require 'jms/message_consumer' require 'jms/message_producer' require 'jms/queue_browser' require 'jms/oracle_a_q_connection_factory' end |
#meta_data ⇒ Object
Gets the metadata for this connection see: download.oracle.com/javaee/6/api/javax/jms/ConnectionMetaData.html
367 368 369 |
# File 'lib/jms/connection.rb', line 367 def @jms_connection.getMetaData end |
#on_exception(&block) ⇒ Object
Whenever an exception occurs the supplied block is called This is important when Connection::on_message has been used, since failures to the connection would be lost otherwise
For details on the supplied parameter when the block is called, see: download.oracle.com/javaee/6/api/javax/jms/JMSException.html
Example:
connection.on_exception do |jms_exception|
puts "JMS Exception has occurred: #{jms_exception}"
end
Returns: nil
361 362 363 |
# File 'lib/jms/connection.rb', line 361 def on_exception(&block) @jms_connection.setExceptionListener(block) end |
#on_message(params, &block) ⇒ Object
Receive messages in a separate thread when they arrive
Allows messages to be received Asynchronously in a separate thread. This method will return to the caller before messages are processed. It is then the callers responsibility to keep the program active so that messages can then be processed.
Session Parameters:
:transacted => true or false
Determines whether transactions are supported within this session.
I.e. Whether commit or rollback can be called
Default: false
Note: :options below are ignored if this value is set to :true
:options => any of the JMS::Session constants:
Note: :options are ignored if :transacted => true
JMS::Session::AUTO_ACKNOWLEDGE
With this acknowledgment mode, the session automatically acknowledges
a client's receipt of a message either when the session has successfully
returned from a call to receive or when the message listener the session has
called to process the message successfully returns.
JMS::Session::CLIENT_ACKNOWLEDGE
With this acknowledgment mode, the client acknowledges a consumed
message by calling the message's acknowledge method.
JMS::Session::DUPS_OK_ACKNOWLEDGE
This acknowledgment mode instructs the session to lazily acknowledge
the delivery of messages.
JMS::Session::SESSION_TRANSACTED
This value is returned from the method getAcknowledgeMode if the
session is transacted.
Default: JMS::Session::AUTO_ACKNOWLEDGE
:session_count : Number of sessions to create, each with their own consumer which
in turn will call the supplied code block.
Note: The supplied block must be thread safe since it will be called
by several threads at the same time.
I.e. Don't change instance variables etc. without the
necessary semaphores etc.
Default: 1
Consumer Parameters:
:queue_name => String: Name of the Queue to return
Symbol: :temporary => Create temporary queue
Mandatory unless :topic_name is supplied
Or,
:topic_name => String: Name of the Topic to write to or subscribe to
Symbol: :temporary => Create temporary topic
Mandatory unless :queue_name is supplied
Or,
:destination=> Explicit javaxJms::Destination to use
:selector => Filter which messages should be returned from the queue
Default: All messages
:no_local => Determine whether messages published by its own connection
should be delivered to the supplied block
Default: false
:statistics Capture statistics on how many messages have been read
true : This method will capture statistics on the number of messages received
and the time it took to process them.
The timer starts when each() is called and finishes when either the last message was received,
or when Destination::statistics is called. In this case MessageConsumer::statistics
can be called several times during processing without affecting the end time.
Also, the start time and message count is not reset until MessageConsumer::each
is called again with :statistics => true
Usage: For transacted sessions the block supplied must return either true or false:
true => The session is committed
false => The session is rolled back
Any Exception => The session is rolled back
Note: Separately invoke Connection#on_exception so that connection failures can be handled
since will Not be called if the connection is lost
452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 |
# File 'lib/jms/connection.rb', line 452 def (params, &block) raise "JMS::Connection must be connected prior to calling JMS::Connection::on_message" unless @sessions && @consumers consumer_count = params[:session_count] || 1 consumer_count.times do session = self.create_session(params) consumer = session.consumer(params) if session.transacted? consumer.(params) do || begin block.call() ? session.commit : session.rollback rescue => exc session.rollback throw exc end end else consumer.(params, &block) end @consumers << consumer @sessions << session end end |
#on_message_statistics ⇒ Object
Return the statistics for every active Connection#on_message consumer in an Array
For details on the contents of each element in the array, see: Consumer#on_message_statistics
480 481 482 |
# File 'lib/jms/connection.rb', line 480 def @consumers.collect { |consumer| consumer. } end |
#session(params = {}, &proc) ⇒ Object
Create a session over this connection. It is recommended to create separate sessions for each thread If a block of code is passed in, it will be called and then the session is automatically closed on completion of the code block
Parameters:
:transacted => true or false
Determines whether transactions are supported within this session.
I.e. Whether commit or rollback can be called
Default: false
Note: :options below are ignored if this value is set to :true
:options => any of the JMS::Session constants:
Note: :options are ignored if :transacted => true
JMS::Session::AUTO_ACKNOWLEDGE
With this acknowledgment mode, the session automatically acknowledges
a client's receipt of a message either when the session has successfully
returned from a call to receive or when the message listener the session has
called to process the message successfully returns.
JMS::Session::CLIENT_ACKNOWLEDGE
With this acknowledgment mode, the client acknowledges a consumed
message by calling the message's acknowledge method.
JMS::Session::DUPS_OK_ACKNOWLEDGE
This acknowledgment mode instructs the session to lazily acknowledge
the delivery of messages.
JMS::Session::SESSION_TRANSACTED
This value is returned from the method getAcknowledgeMode if the
session is transacted.
Default: JMS::Session::AUTO_ACKNOWLEDGE
265 266 267 268 269 270 271 272 273 |
# File 'lib/jms/connection.rb', line 265 def session(params={}, &proc) raise "Missing mandatory Block when calling JMS::Connection#session" unless proc session = self.create_session(params) begin proc.call(session) ensure session.close end end |
#start ⇒ Object
Start (or restart) delivery of incoming messages over this connection. By default no messages are delivered until this method is called explicitly Delivery of messages to any asynchronous Destination::each() call will only start after Connection::start is called, or Connection.start is used
223 224 225 |
# File 'lib/jms/connection.rb', line 223 def start @jms_connection.start end |
#stop ⇒ Object
Temporarily stop delivery of incoming messages on this connection Useful during a hot code update or other changes that need to be completed without any new messages being processed Call start() to resume receiving messages
231 232 233 |
# File 'lib/jms/connection.rb', line 231 def stop @jms_connection.stop end |
#to_s ⇒ Object
Return a string describing the JMS provider and version
372 373 374 375 |
# File 'lib/jms/connection.rb', line 372 def to_s md = @jms_connection.getMetaData "JMS::Connection provider: #{md.getJMSProviderName} v#{md.getProviderVersion}, JMS v#{md.getJMSVersion}" end |