Class: JMS::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/jms/connection.rb

Overview

Every JMS session must have at least one Connection instance A Connection instance represents a connection between this client application and the JMS Provider (server/queue manager/broker). A connection is distinct from a Session, in that multiple Sessions can share a single connection. Also, unit of work control (commit/rollback) is performed at the Session level.

Since many JRuby applications will only have one connection and one session several convenience methods have been added to support creating both the Session and Connection objects automatically.

For Example, to read all messages from a queue and then terminate:

require 'rubygems'
require 'jms'

JMS::Connection.create_session(
  :factory => 'org.apache.activemq.ActiveMQConnectionFactory',
  :broker_url => 'tcp://localhost:61616',
  :require_jars => [
    '~/Applications/apache-activemq-5.5.0/activemq-all-5.5.0.jar',
    '~/Applications/apache-activemq-5.5.0/lib/optional/slf4j-log4j12-1.5.11.jar',
    '~/Applications/apache-activemq-5.5.0/lib/optional/log4j-1.2.14.jar',
  ]
) do |session|
  session.consumer(:queue_name=>'TEST') do |consumer|
    if message = consumer.receive_no_wait
      puts "Data Received: #{message.data}"
    else
      puts 'No message available'
    end
  end
end

The above code creates a Connection and then a Session. Once the block completes the session is closed and the Connection disconnected.

See: download.oracle.com/javaee/6/api/javax/jms/Connection.html

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(params = {}) ⇒ Connection

Create a connection to the JMS provider

Note: Connection::start must be called before any consumers will be

able to receive messages

In JMS we need to start by obtaining the JMS Factory class that is supplied by the JMS Vendor.

There are 3 ways to establish a connection to a JMS Provider

1. Supply the name of the JMS Providers Factory Class
2. Supply an instance of the JMS Provider class itself
3. Use a JNDI lookup to return the JMS Provider Factory class

Parameters:

:factory   => String: Name of JMS Provider Factory class
           => Class: JMS Provider Factory class itself

:jndi_name    => String: Name of JNDI entry at which the Factory can be found
:jndi_context => Mandatory if jndi lookup is being used, contains details
                 on how to connect to JNDI server etc.

:require_jars => An optional array of Jar file names to load for the specified
                 JMS provider. By using this option it is not necessary
                 to put all the JMS Provider specific jar files into the
                 environment variable CLASSPATH prior to starting JRuby

:username  => Username to connect to JMS provider with
:password  => Password to use when to connecting to the JMS provider
              Note: :password is ignored if :username is not supplied

:factory and :jndi_name are mutually exclusive, both cannot be supplied at the same time. :factory takes precedence over :jndi_name

JMS Provider specific properties can be set if the JMS Factory itself has setters for those properties.

For some known examples, see: [Example jms.yml](github.com/reidmorrison/jruby-jms/blob/master/examples/jms.yml)



167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/jms/connection.rb', line 167

def initialize(params = {})
  # Used by ::on_message
  @sessions = []
  @consumers = []

  options = params.dup

  # Load Jar files on demand so that they do not need to be in the CLASSPATH
  # of JRuby lib directory
  fetch_dependencies(options.delete(:require_jars))

  connection_factory = nil
  factory = options.delete(:factory)
  if factory
    # If factory is a string, then it is the name of a class, not the class itself
    factory = eval(factory) if factory.respond_to? :to_str
    connection_factory = factory.new
  elsif jndi_name = options[:jndi_name]
    raise "Missing mandatory parameter :jndi_context missing in call to Connection::connect" unless jndi_context = options[:jndi_context]
    jndi = javax.naming.InitialContext.new(java.util.Hashtable.new(jndi_context))
    begin
      connection_factory = jndi.lookup jndi_name
    ensure
      jndi.close
    end
  else
    raise "Missing mandatory parameter :factory or :jndi_name missing in call to Connection::connect"
  end
  options.delete(:jndi_name)
  options.delete(:jndi_context)

  JMS::logger.debug "Using Factory: #{connection_factory.java_class}" if connection_factory.respond_to? :java_class
  options.each_pair do |key, val|
    next if [:username, :password].include?(key)

    method = key.to_s+'='
    if connection_factory.respond_to? method
      connection_factory.send method, val
      JMS::logger.debug "   #{key} = #{connection_factory.send key.to_sym}" if connection_factory.respond_to? key.to_sym
    else
      JMS::logger.warn "#{connection_factory.java_class} does not understand option: :#{key}=#{val}, ignoring :#{key}" if connection_factory.respond_to? :java_class
    end
  end

  # Check for username and password
  if options[:username]
    @jms_connection = connection_factory.create_connection(options[:username], options[:password])
  else
    @jms_connection = connection_factory.create_connection
  end
end

Class Method Details

.session(params = {}, &proc) ⇒ Object

Connect to a JMS Broker, create and start the session, then call the code block passing in the session. Both the Session and Connection are closed on termination of the block

Shortcut convenience method to both connect to the broker and create a session Useful when only a single session is required in the current thread

Note: It is important that each thread have its own session to support transactions

This method will also start the session immediately so that any
consumers using this session will start immediately


83
84
85
86
87
# File 'lib/jms/connection.rb', line 83

def self.session(params = {}, &proc)
  self.start(params) do |connection|
    connection.session(params, &proc)
  end
end

.start(params = {}, &proc) ⇒ Object

Create a connection to the JMS provider, start the connection, call the supplied code block, then close the connection upon completion

Returns the result of the supplied block



62
63
64
65
66
67
68
69
70
71
# File 'lib/jms/connection.rb', line 62

def self.start(params = {}, &proc)
  raise "Missing mandatory Block when calling JMS::Connection.start" unless proc
  connection = Connection.new(params)
  connection.start
  begin
    proc.call(connection)
  ensure
    connection.close
  end
end

Instance Method Details

#client_idObject

Gets the client identifier for this connection.



326
327
328
# File 'lib/jms/connection.rb', line 326

def client_id
  @jms_connection.getClientID
end

#client_id=(client_id) ⇒ Object

Sets the client identifier for this connection.



331
332
333
# File 'lib/jms/connection.rb', line 331

def client_id=(client_id)
  @jms_connection.setClientID(client_id)
end

#closeObject

Close connection with the JMS Provider First close any consumers or sessions that are active as a result of JMS::Connection::on_message



315
316
317
318
319
320
321
322
323
# File 'lib/jms/connection.rb', line 315

def close
  @consumers.each {|consumer| consumer.close } if @consumers
  @consumers = []

  @sessions.each {|session| session.close} if @sessions
  @session=[]

  @jms_connection.close if @jms_connection
end

#create_session(params = {}) ⇒ Object

Create a session over this connection. It is recommended to create separate sessions for each thread

Note: Remember to call close on the returned session when it is no longer

needed. Rather use JMS::Connection#session with a block whenever
possible

Parameters:

:transacted => true or false
    Determines whether transactions are supported within this session.
    I.e. Whether commit or rollback can be called
    Default: false
    Note: :options below are ignored if this value is set to :true

:options => any of the JMS::Session constants:
   Note: :options are ignored if :transacted => true
   JMS::Session::AUTO_ACKNOWLEDGE
      With this acknowledgment mode, the session automatically acknowledges
      a client's receipt of a message either when the session has successfully
      returned from a call to receive or when the message listener the session has
      called to process the message successfully returns.
   JMS::Session::CLIENT_ACKNOWLEDGE
      With this acknowledgment mode, the client acknowledges a consumed
      message by calling the message's acknowledge method.
   JMS::Session::DUPS_OK_ACKNOWLEDGE
      This acknowledgment mode instructs the session to lazily acknowledge
      the delivery of messages.
   JMS::Session::SESSION_TRANSACTED
      This value is returned from the method getAcknowledgeMode if the
      session is transacted.
   Default: JMS::Session::AUTO_ACKNOWLEDGE


307
308
309
310
311
# File 'lib/jms/connection.rb', line 307

def create_session(params={})
  transacted = params[:transacted] || false
  options = params[:options] || JMS::Session::AUTO_ACKNOWLEDGE
  @jms_connection.create_session(transacted, options)
end

#create_session_pool(params = {}) ⇒ Object

Since a Session can only be used by one thread at a time, we could create a Session for every thread. That could result in excessive unused Sessions. An alternative is to create a pool of sessions that can be shared by multiple threads.

Each thread can request a session and then return it once it is no longer needed by that thread. The only way to get a session is to pass a block so that the Session is automatically returned to the pool upon completion of the block.

Parameters:

see regular session parameters from: JMS::Connection#initialize

Additional parameters for controlling the session pool itself

:pool_name         Name of the pool as it shows up in the logger.
                   Default: 'JMS::SessionPool'
:pool_size         Maximum Pool Size. Default: 10
                   The pool only grows as needed and will never exceed
                   :pool_size
:pool_warn_timeout Number of seconds to wait before logging a warning when a
                   session in the pool is not available. Measured in seconds
                   Default: 5.0
:pool_logger       Supply a logger that responds to #debug, #info, #warn and #debug?
                   For example: Rails.logger
                   Default: None

Example:

session_pool = connection.create_session_pool(config)

session_pool.session do |session|
   producer.send(session.message("Hello World"))
end


514
515
516
517
# File 'lib/jms/connection.rb', line 514

def create_session_pool(params={})
  require 'jms/session_pool' unless defined? JMS::SessionPool
  JMS::SessionPool.new(self, params)
end

#exception_listenerObject

Returns the ExceptionListener object for this connection Returned class implements interface JMS::ExceptionListener



337
338
339
# File 'lib/jms/connection.rb', line 337

def exception_listener
  @jms_connection.getExceptionListener
end

#exception_listener=(listener) ⇒ Object

Sets an exception listener for this connection See ::on_exception to set a Ruby Listener Returns: nil



344
345
346
# File 'lib/jms/connection.rb', line 344

def exception_listener=(listener)
  @jms_connection.setExceptionListener(listener)
end

#fetch_dependencies(jar_list) ⇒ Object

Load the required jar files for this JMS Provider and load JRuby extensions for those classes

Rather than copying the JMS jar files into the JRuby lib, load them on demand. JRuby JMS extensions are only loaded once the jar files have been loaded.

Can be called multiple times if required, although it would not be performant to do so regularly.

Parameter: jar_list is an Array of the path and filenames to jar files

to load for this JMS Provider

Returns nil

TODO make this a class method



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/jms/connection.rb', line 105

def fetch_dependencies(jar_list)
  jar_list.each do |jar|
    JMS::logger.debug "Loading Jar File:#{jar}"
    begin
      require jar
    rescue Exception => exc
      JMS::logger.error "Failed to Load Jar File:#{jar}. #{exc.to_s}"
    end
  end if jar_list

  require 'jms/mq_workaround'
  require 'jms/imports'
  require 'jms/message_listener_impl'
  require 'jms/message'
  require 'jms/text_message'
  require 'jms/map_message'
  require 'jms/bytes_message'
  require 'jms/object_message'
  require 'jms/session'
  require 'jms/message_consumer'
  require 'jms/message_producer'
  require 'jms/queue_browser'
  require 'jms/oracle_a_q_connection_factory'
end

#meta_dataObject

Gets the metadata for this connection see: download.oracle.com/javaee/6/api/javax/jms/ConnectionMetaData.html



367
368
369
# File 'lib/jms/connection.rb', line 367

def 
  @jms_connection.
end

#on_exception(&block) ⇒ Object

Whenever an exception occurs the supplied block is called This is important when Connection::on_message has been used, since failures to the connection would be lost otherwise

For details on the supplied parameter when the block is called, see: download.oracle.com/javaee/6/api/javax/jms/JMSException.html

Example:

connection.on_exception do |jms_exception|
  puts "JMS Exception has occurred: #{jms_exception}"
end

Returns: nil



361
362
363
# File 'lib/jms/connection.rb', line 361

def on_exception(&block)
  @jms_connection.setExceptionListener(block)
end

#on_message(params, &block) ⇒ Object

Receive messages in a separate thread when they arrive

Allows messages to be received Asynchronously in a separate thread. This method will return to the caller before messages are processed. It is then the callers responsibility to keep the program active so that messages can then be processed.

Session Parameters:

:transacted => true or false
    Determines whether transactions are supported within this session.
    I.e. Whether commit or rollback can be called
    Default: false
    Note: :options below are ignored if this value is set to :true

:options => any of the JMS::Session constants:
   Note: :options are ignored if :transacted => true
   JMS::Session::AUTO_ACKNOWLEDGE
      With this acknowledgment mode, the session automatically acknowledges
      a client's receipt of a message either when the session has successfully
      returned from a call to receive or when the message listener the session has
      called to process the message successfully returns.
   JMS::Session::CLIENT_ACKNOWLEDGE
      With this acknowledgment mode, the client acknowledges a consumed
      message by calling the message's acknowledge method.
   JMS::Session::DUPS_OK_ACKNOWLEDGE
      This acknowledgment mode instructs the session to lazily acknowledge
      the delivery of messages.
   JMS::Session::SESSION_TRANSACTED
      This value is returned from the method getAcknowledgeMode if the
      session is transacted.
   Default: JMS::Session::AUTO_ACKNOWLEDGE

 :session_count : Number of sessions to create, each with their own consumer which
                  in turn will call the supplied code block.
                  Note: The supplied block must be thread safe since it will be called
                        by several threads at the same time.
                        I.e. Don't change instance variables etc. without the
                        necessary semaphores etc.
                  Default: 1

Consumer Parameters:

:queue_name => String: Name of the Queue to return
               Symbol: :temporary => Create temporary queue
               Mandatory unless :topic_name is supplied
  Or,
:topic_name => String: Name of the Topic to write to or subscribe to
               Symbol: :temporary => Create temporary topic
               Mandatory unless :queue_name is supplied
  Or,
:destination=> Explicit javaxJms::Destination to use

:selector   => Filter which messages should be returned from the queue
               Default: All messages

:no_local   => Determine whether messages published by its own connection
               should be delivered to the supplied block
               Default: false

:statistics Capture statistics on how many messages have been read
   true  : This method will capture statistics on the number of messages received
           and the time it took to process them.
           The timer starts when each() is called and finishes when either the last message was received,
           or when Destination::statistics is called. In this case MessageConsumer::statistics
           can be called several times during processing without affecting the end time.
           Also, the start time and message count is not reset until MessageConsumer::each
           is called again with :statistics => true

Usage: For transacted sessions the block supplied must return either true or false:

true => The session is committed
false => The session is rolled back
Any Exception => The session is rolled back

Note: Separately invoke Connection#on_exception so that connection failures can be handled

since on_message will Not be called if the connection is lost


452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
# File 'lib/jms/connection.rb', line 452

def on_message(params, &block)
  raise "JMS::Connection must be connected prior to calling JMS::Connection::on_message" unless @sessions && @consumers

  consumer_count = params[:session_count] || 1
  consumer_count.times do
    session = self.create_session(params)
    consumer = session.consumer(params)
    if session.transacted?
      consumer.on_message(params) do |message|
        begin
          block.call(message) ? session.commit : session.rollback
        rescue => exc
          session.rollback
          throw exc
        end
      end
    else
      consumer.on_message(params, &block)
    end
    @consumers << consumer
    @sessions << session
  end
end

#on_message_statisticsObject

Return the statistics for every active Connection#on_message consumer in an Array

For details on the contents of each element in the array, see: Consumer#on_message_statistics



480
481
482
# File 'lib/jms/connection.rb', line 480

def on_message_statistics
  @consumers.collect { |consumer| consumer.on_message_statistics }
end

#session(params = {}, &proc) ⇒ Object

Create a session over this connection. It is recommended to create separate sessions for each thread If a block of code is passed in, it will be called and then the session is automatically closed on completion of the code block

Parameters:

:transacted => true or false
    Determines whether transactions are supported within this session.
    I.e. Whether commit or rollback can be called
    Default: false
    Note: :options below are ignored if this value is set to :true

:options => any of the JMS::Session constants:
   Note: :options are ignored if :transacted => true
   JMS::Session::AUTO_ACKNOWLEDGE
      With this acknowledgment mode, the session automatically acknowledges
      a client's receipt of a message either when the session has successfully
      returned from a call to receive or when the message listener the session has
      called to process the message successfully returns.
   JMS::Session::CLIENT_ACKNOWLEDGE
      With this acknowledgment mode, the client acknowledges a consumed
      message by calling the message's acknowledge method.
   JMS::Session::DUPS_OK_ACKNOWLEDGE
      This acknowledgment mode instructs the session to lazily acknowledge
      the delivery of messages.
   JMS::Session::SESSION_TRANSACTED
      This value is returned from the method getAcknowledgeMode if the
      session is transacted.
   Default: JMS::Session::AUTO_ACKNOWLEDGE


265
266
267
268
269
270
271
272
273
# File 'lib/jms/connection.rb', line 265

def session(params={}, &proc)
  raise "Missing mandatory Block when calling JMS::Connection#session" unless proc
  session = self.create_session(params)
  begin
    proc.call(session)
  ensure
    session.close
  end
end

#startObject

Start (or restart) delivery of incoming messages over this connection. By default no messages are delivered until this method is called explicitly Delivery of messages to any asynchronous Destination::each() call will only start after Connection::start is called, or Connection.start is used



223
224
225
# File 'lib/jms/connection.rb', line 223

def start
  @jms_connection.start
end

#stopObject

Temporarily stop delivery of incoming messages on this connection Useful during a hot code update or other changes that need to be completed without any new messages being processed Call start() to resume receiving messages



231
232
233
# File 'lib/jms/connection.rb', line 231

def stop
  @jms_connection.stop
end

#to_sObject

Return a string describing the JMS provider and version



372
373
374
375
# File 'lib/jms/connection.rb', line 372

def to_s
  md = @jms_connection.
  "JMS::Connection provider: #{md.getJMSProviderName} v#{md.getProviderVersion}, JMS v#{md.getJMSVersion}"
end