Class: JMS::Connection
- Inherits:
-
Object
- Object
- JMS::Connection
- Defined in:
- lib/jms/connection.rb
Overview
Every JMS session must have at least one Connection instance A Connection instance represents a connection between this client application and the JMS Provider (server/queue manager/broker). A connection is distinct from a Session, in that multiple Sessions can share a single connection. Also, unit of work control (commit/rollback) is performed at the Session level.
Since many JRuby applications will only have one connection and one session several convenience methods have been added to support creating both the Session and Connection objects automatically.
For Example, to read all messages from a queue and then terminate:
require 'rubygems'
require 'jms'
JMS::Connection.create_session(
:queue_manager=>'REID', # Should be :q_mgr_name
:host_name=>'localhost',
:channel=>'MY.CLIENT.CHL',
:port=>1414,
:factory => com.ibm.mq.jms.MQQueueConnectionFactory,
:transport_type => com.ibm.mq.jms.JMSC::MQJMS_TP_CLIENT_MQ_TCPIP,
:username => 'mqm'
) do |session|
session.consumer(:queue_name=>'TEST', :mode=>:input) do |consumer|
if = consumer.receive_no_wait
puts "Data Received: #{.data}"
else
puts 'No message available'
end
end
end
The above code creates a Connection and then a Session. Once the block completes the session is closed and the Connection disconnected.
See: download.oracle.com/javaee/6/api/javax/jms/Connection.html
Class Method Summary collapse
-
.session(params = {}, &proc) ⇒ Object
Connect to a JMS Broker, create a session and call the code block passing in the session Both the Session and Connection are closed on termination of the block.
-
.start(params = {}, &proc) ⇒ Object
Create a connection to the JMS provider, start the connection, call the supplied code block, then close the connection upon completion.
Instance Method Summary collapse
-
#client_id ⇒ Object
Gets the client identifier for this connection.
-
#client_id=(client_id) ⇒ Object
Sets the client identifier for this connection.
-
#close ⇒ Object
Close connection with the JMS Provider First close any consumers or sessions that are active as a result of JMS::Connection::on_message.
-
#create_session(params = {}) ⇒ Object
Create a session over this connection.
-
#exception_listener ⇒ Object
Returns the ExceptionListener object for this connection Returned class implements interface javax.jms.ExceptionListener.
-
#exception_listener=(listener) ⇒ Object
Sets an exception listener for this connection See ::on_exception to set a Ruby Listener Returns: nil.
-
#fetch_dependencies(jar_list) ⇒ Object
Load the required jar files for this JMS Provider and load JRuby extensions for those classes.
-
#initialize(params = {}) ⇒ Connection
constructor
Create a connection to the JMS provider.
-
#meta_data ⇒ Object
Gets the metadata for this connection see: download.oracle.com/javaee/6/api/javax/jms/ConnectionMetaData.html.
-
#on_exception(&block) ⇒ Object
Whenever an exception occurs the supplied block is called This is important when Connection::on_message has been used, since failures to the connection would be lost otherwise.
-
#on_message(params, &proc) ⇒ Object
Receive messages in a separate thread when they arrive Allows messages to be recieved in a separate thread.
- #on_message_statistics ⇒ Object
-
#session(params = {}, &proc) ⇒ Object
Create a session over this connection.
-
#start ⇒ Object
Start (or restart) delivery of incoming messages over this connection.
-
#stop ⇒ Object
Temporarily stop delivery of incoming messages on this connection Useful during a hot code update or other changes that need to be completed without any new messages being processed Call start() to resume receiving messages.
-
#to_s ⇒ Object
Return a string describing the JMS provider and version.
Constructor Details
#initialize(params = {}) ⇒ Connection
Create a connection to the JMS provider
Note: Connection::start must be called before any consumers will be
able to receive
In JMS we need to start by obtaining the JMS Factory class that is supplied by the JMS Vendor.
There are 3 ways to establish a connection to a JMS Provider
1. Supply the name of the JMS Providers Factory Class
2. Supply an instance of the JMS Provider class itself
3. Use a JNDI lookup to return the JMS Provider Factory class
Parameters:
:factory => String: Name of JMS Provider Factory class
=> Class: JMS Provider Factory class itself
:jndi_name => String: Name of JNDI entry at which the Factory can be found
:jndi_context => Mandatory if jndi lookup is being used, contains details
on how to connect to JNDI server etc.
:factory and :jndi_name are mutually exclusive, both cannot be supplied at the same time. :factory takes precedence over :jndi_name
JMS Provider specific properties can be set if the JMS Factory itself has setters for those properties. Some known examples:
For HornetQ
:factory => 'org.hornetq.jms.client.HornetQConnectionFactory',
:discovery_address => '127.0.0.1',
:discovery_port => '5445',
:username => 'guest',
:password => 'guest'
For HornetQ using JNDI lookup technique
:jndi_name => '/ConnectionFactory',
:jndi_context => {
'java.naming.factory.initial' => 'org.jnp.interfaces.NamingContextFactory',
'java.naming.provider.url' => 'jnp://localhost:1099',
'java.naming.factory.url.pkgs' => 'org.jboss.naming:org.jnp.interfaces',
'java.naming.security.principal' => 'guest',
'java.naming.security.credentials' => 'guest'
}
On Java 6, HornetQ needs the following jar files on your CLASSPATH:
hornetq-core-client.jar
netty.jar
hornetq-jms-client.jar
jboss-jms-api.jar
jnp-client.jar
On Java 5, HornetQ needs the following jar files on your CLASSPATH:
hornetq-core-client-java5.jar
netty.jar
hornetq-jms-client-java5.jar
jboss-jms-api.jar
jnp-client.jar
For: WebSphere MQ
:factory => 'com.ibm.mq.jms.MQQueueConnectionFactory',
:queue_manager=>'REID',
:host_name=>'localhost',
:channel=>'MY.CLIENT.CHL',
:port=>1414,
:transport_type => com.ibm.mq.jms.JMSC::MQJMS_TP_CLIENT_MQ_TCPIP,
:username => 'mqm'
For: Active MQ
:factory => 'org.apache.activemq.ActiveMQConnectionFactory',
:broker_url => 'tcp://localhost:61616'
ActiveMQ requires the following jar files on your CLASSPATH
For Oracle AQ 9 Server
:factory => 'JMS::OracleAQConnectionFactory',
:url => 'jdbc:oracle:thin:@hostname:1521:instanceid',
:username => 'aquser',
:password => 'mypassword'
For JBoss, which uses JNDI lookup technique
:jndi_name => 'ConnectionFactory',
:jndi_context => {
'java.naming.factory.initial' => 'org.jnp.interfaces.NamingContextFactory',
'java.naming.provider.url' => 'jnp://localhost:1099'
'java.naming.security.principal' => 'user',
'java.naming.security.credentials' => 'pwd'
}
For Apache Qpid / Redhat Messaging, using Factory class directly
:factory: org.apache.qpid.client.AMQConnectionFactory
:broker_url: tcp://localhost:5672
For Apache Qpid / Redhat Messaging, via JNDI lookup
:jndi_name => 'local',
:jndi_context => {
'java.naming.factory.initial' => 'org.apache.qpid.jndi.PropertiesFileInitialContextFactory',
'connectionfactory.local' => "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'"
}
219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 |
# File 'lib/jms/connection.rb', line 219 def initialize(params = {}) # Used by ::on_message @sessions = [] @consumers = [] # Load Jar files on demand so that they do not need to be in the CLASSPATH # of JRuby lib directory fetch_dependencies(params[:require_jars]) if params[:require_jars] connection_factory = nil factory = params[:factory] if factory # If factory is a string, then it is the name of a class, not the class itself factory = eval(factory) if factory.respond_to? :to_str connection_factory = factory.new elsif jndi_name = params[:jndi_name] raise "Missing mandatory parameter :jndi_context missing in call to Connection::connect" unless jndi_context = params[:jndi_context] jndi = javax.naming.InitialContext.new(java.util.Hashtable.new(jndi_context)) begin connection_factory = jndi.lookup jndi_name ensure jndi.close end else raise "Missing mandatory parameter :factory or :jndi_name missing in call to Connection::connect" end JMS::logger.debug "Using Factory: #{connection_factory.java_class}" if connection_factory.respond_to? :java_class params.each_pair do |key, val| method = key.to_s+'=' if connection_factory.respond_to? method connection_factory.send method, val JMS::logger.debug " #{key} = #{connection_factory.send key}" if connection_factory.respond_to? key.to_sym end end if params[:username] @jms_connection = connection_factory.create_connection(params[:username], params[:password]) else @jms_connection = connection_factory.create_connection end end |
Class Method Details
.session(params = {}, &proc) ⇒ Object
Connect to a JMS Broker, create a session and call the code block passing in the session Both the Session and Connection are closed on termination of the block
Shortcut convenience method to both connect to the broker and create a session Useful when only a single session is required in the current thread
Note: It is important that each thread have its own session to support transactions
80 81 82 83 84 |
# File 'lib/jms/connection.rb', line 80 def self.session(params = {}, &proc) self.start(params) do |connection| connection.session(params, &proc) end end |
.start(params = {}, &proc) ⇒ Object
Create a connection to the JMS provider, start the connection, call the supplied code block, then close the connection upon completion
Returns the result of the supplied block
62 63 64 65 66 67 68 69 70 71 |
# File 'lib/jms/connection.rb', line 62 def self.start(params = {}, &proc) raise "Missing mandatory Block when calling JMS::Connection.start" unless proc connection = Connection.new(params) connection.start begin proc.call(connection) ensure connection.close end end |
Instance Method Details
#client_id ⇒ Object
Gets the client identifier for this connection.
350 351 352 |
# File 'lib/jms/connection.rb', line 350 def client_id @jms_connection.getClientID end |
#client_id=(client_id) ⇒ Object
Sets the client identifier for this connection.
355 356 357 |
# File 'lib/jms/connection.rb', line 355 def client_id=(client_id) @jms_connection.setClientID(client_id) end |
#close ⇒ Object
Close connection with the JMS Provider First close any consumers or sessions that are active as a result of JMS::Connection::on_message
339 340 341 342 343 344 345 346 347 |
# File 'lib/jms/connection.rb', line 339 def close @consumers.each {|consumer| consumer.close } if @consumers @consumers = [] @sessions.each {|session| session.close} if @sessions @session=[] @jms_connection.close if @jms_connection end |
#create_session(params = {}) ⇒ Object
Create a session over this connection. It is recommended to create separate sessions for each thread
Note: Remember to call close on the returned session when it is no longer
needed. Rather use JMS::Connection#session with a block whenever
possible
Parameters:
:transacted => true or false
Determines whether transactions are supported within this session.
I.e. Whether commit or rollback can be called
Default: false
Note: :options below is ignored if this value is set to :true
:options => any of the javax.jms.Session constants:
Note: :options is ignored of :transacted => true
javax.jms.Session::AUTO_ACKNOWLEDGE
With this acknowledgment mode, the session automatically acknowledges
a client's receipt of a message either when the session has successfully
returned from a call to receive or when the message listener the session has
called to process the message successfully returns.
javax.jms.Session::CLIENT_ACKNOWLEDGE
With this acknowledgment mode, the client acknowledges a consumed
message by calling the message's acknowledge method.
javax.jms.Session::DUPS_OK_ACKNOWLEDGE
This acknowledgment mode instructs the session to lazily acknowledge
the delivery of messages.
javax.jms.Session::SESSION_TRANSACTED
This value is returned from the method getAcknowledgeMode if the
session is transacted.
Default: javax.jms.Session::AUTO_ACKNOWLEDGE
331 332 333 334 335 |
# File 'lib/jms/connection.rb', line 331 def create_session(params={}) transacted = params[:transacted] || false = params[:options] || javax.jms.Session::AUTO_ACKNOWLEDGE @jms_connection.create_session(transacted, ) end |
#exception_listener ⇒ Object
Returns the ExceptionListener object for this connection Returned class implements interface javax.jms.ExceptionListener
361 362 363 |
# File 'lib/jms/connection.rb', line 361 def exception_listener @jms_connection.getExceptionListener end |
#exception_listener=(listener) ⇒ Object
Sets an exception listener for this connection See ::on_exception to set a Ruby Listener Returns: nil
368 369 370 |
# File 'lib/jms/connection.rb', line 368 def exception_listener=(listener) setExceptionListener(listener) end |
#fetch_dependencies(jar_list) ⇒ Object
Load the required jar files for this JMS Provider and load JRuby extensions for those classes
Rather than copying the JMS jar files into the JRuby lib, load them on demand. JRuby JMS extensions are only loaded once the jar files have been loaded.
Can be called multiple times if required, although it would not be performant to do so regularly.
Parameter: jar_list is an Array of the path and filenames to jar files
to load for this JMS Provider
Returns nil
TODO make this a class method
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/jms/connection.rb', line 102 def fetch_dependencies(jar_list) jar_list.each do |jar| JMS::logger.info "Loading Jar File:#{jar}" begin require jar rescue Exception => exc JMS::logger.error "Failed to Load Jar File:#{jar}. #{exc.to_s}" end end require 'jms/message_listener' require 'jms/javax_jms_message' require 'jms/javax_jms_text_message' require 'jms/javax_jms_map_message' require 'jms/javax_jms_object_message' require 'jms/javax_jms_session' require 'jms/javax_jms_message_consumer' require 'jms/javax_jms_queue_browser' end |
#meta_data ⇒ Object
Gets the metadata for this connection see: download.oracle.com/javaee/6/api/javax/jms/ConnectionMetaData.html
391 392 393 |
# File 'lib/jms/connection.rb', line 391 def @jms_connection.getMetaData end |
#on_exception(&block) ⇒ Object
Whenever an exception occurs the supplied block is called This is important when Connection::on_message has been used, since failures to the connection would be lost otherwise
For details on the supplied parameter when the block is called, see: download.oracle.com/javaee/6/api/javax/jms/JMSException.html
Example:
connection. do |jms_exception|
puts "JMS Exception has occurred: #{jms_exception}"
end
Returns: nil
385 386 387 |
# File 'lib/jms/connection.rb', line 385 def on_exception(&block) setExceptionListener(block) end |
#on_message(params, &proc) ⇒ Object
Receive messages in a separate thread when they arrive Allows messages to be recieved in a separate thread. I.e. Asynchronously This method will return to the caller before messages are processed. It is then the callers responsibility to keep the program active so that messages can then be processed.
Session Parameters:
:transacted => true or false
Determines whether transactions are supported within this session.
I.e. Whether commit or rollback can be called
Default: false
:options => any of the javax.jms.Session constants
Default: javax.jms.Session::AUTO_ACKNOWLEDGE
:session_count : Number of sessions to create, each with their own consumer which
in turn will call the supplied Proc.
Note: The supplied Proc must be thread safe since it will be called
by several threads at the same time.
I.e. Don't change instance variables etc. without the
necessary semaphores etc.
Default: 1
Consumer Parameters:
:queue_name => String: Name of the Queue to return
Symbol: :temporary => Create temporary queue
Mandatory unless :topic_name is supplied
Or,
:topic_name => String: Name of the Topic to write to or subscribe to
Symbol: :temporary => Create temporary topic
Mandatory unless :queue_name is supplied
Or,
:destination=> Explicit javaxJms::Destination to use
:selector => Filter which messages should be returned from the queue
Default: All messages
:no_local => Determine whether messages published by its own connection
should be delivered to it
Default: false
:statistics Capture statistics on how many messages have been read
true : This method will capture statistics on the number of messages received
and the time it took to process them.
The timer starts when each() is called and finishes when either the last message was received,
or when Destination::statistics is called. In this case MessageConsumer::statistics
can be called several times during processing without affecting the end time.
Also, the start time and message count is not reset until MessageConsumer::each
is called again with :statistics => true
The statistics gathered are returned when :statistics => true and :async => false
Usage: For transacted sessions (the default) the Proc supplied must return
either true or false:
true => The session is committed
false => The session is rolled back
Any Exception => The session is rolled back
Note: Also supply connection::on_exception so that connection failures can be handled
464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 |
# File 'lib/jms/connection.rb', line 464 def (params, &proc) raise "JMS::Connection must be connected prior to calling JMS::Connection::on_message" unless @sessions && @consumers consumer_count = params[:session_count] || 1 consumer_count.times do session = self.create_session(params) consumer = session.consumer(params) if session.transacted? consumer.(params) do || begin proc.call() ? session.commit : session.rollback rescue => exc session.rollback throw exc end end else consumer.(params, &proc) end @consumers << consumer @sessions << session end end |
#on_message_statistics ⇒ Object
488 489 490 |
# File 'lib/jms/connection.rb', line 488 def @consumers.collect{|consumer| consumer.} end |
#session(params = {}, &proc) ⇒ Object
Create a session over this connection. It is recommended to create separate sessions for each thread If a block of code is passed in, it will be called and then the session is automatically closed on completion of the code block
Parameters:
:transacted => true or false
Determines whether transactions are supported within this session.
I.e. Whether commit or rollback can be called
Default: false
:options => any of the javax.jms.Session constants
Default: javax.jms.Session::AUTO_ACKNOWLEDGE
290 291 292 293 294 295 296 297 298 |
# File 'lib/jms/connection.rb', line 290 def session(params={}, &proc) raise "Missing mandatory Block when calling JMS::Connection#session" unless proc session = self.create_session(params) begin proc.call(session) ensure session.close end end |
#start ⇒ Object
Start (or restart) delivery of incoming messages over this connection. By default no messages are delivered until this method is called explicitly Delivery of messages to any asynchronous Destination::each() call will only start after Connection::start is called, or Connection.start is used
265 266 267 |
# File 'lib/jms/connection.rb', line 265 def start @jms_connection.start end |
#stop ⇒ Object
Temporarily stop delivery of incoming messages on this connection Useful during a hot code update or other changes that need to be completed without any new messages being processed Call start() to resume receiving messages
273 274 275 |
# File 'lib/jms/connection.rb', line 273 def stop @jms_connection.stop end |
#to_s ⇒ Object
Return a string describing the JMS provider and version
396 397 398 399 |
# File 'lib/jms/connection.rb', line 396 def to_s md = @jms_connection.getMetaData "JMS::Connection provider: #{md.getJMSProviderName} v#{md.getProviderVersion}, JMS v#{md.getJMSVersion}" end |