Class: JMS::SessionPool

Inherits:
Object
  • Object
show all
Defined in:
lib/jms/session_pool.rb

Overview

Since a Session can only be used by one thread at a time, we could create a Session for every thread. That could result in excessive unused Sessions. An alternative is to create a pool of sessions that can be shared by multiple threads.

Each thread can request a session and then return it once it is no longer needed by that thread. The only way to get a session is pass a block so that the Session is automatically returned to the pool upon completion of the block.

Parameters:

see regular session parameters from: JMS::Connection#initialize

Additional parameters for controlling the session pool itself

:pool_size         Maximum Pool Size. Default: 10
                   The pool only grows as needed and will never exceed
                   :pool_size
:pool_timeout      Number of seconds to wait before raising a TimeoutError
                   if no sessions are available in the poo
                   Default: 60
:pool_warn_timeout Number of seconds to wait before logging a warning when a
                   session in the pool is not available
                   Default: 5
:pool_name         Name of the pool as it shows up in the logger.
                   Default: 'JMS::SessionPool'

Example:

session_pool = connection.create_session_pool(config)
session_pool.session do |session|
   ....
end

Instance Method Summary collapse

Constructor Details

#initialize(connection, params = {}) ⇒ SessionPool

Returns a new instance of SessionPool.



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/jms/session_pool.rb', line 34

def initialize(connection, params={})
  # Save Session params since it will be used every time a new session is
  # created in the pool
  session_params = params.nil? ? {} : params.dup
  logger         = SemanticLogger[session_params[:pool_name] || self.class]

  # Use GenePool can create and manage the pool of sessions
  @pool          = GenePool.new(
    name:         '',
    pool_size:    session_params[:pool_size] || 10,
    warn_timeout: session_params[:pool_warn_timeout] || 5,
    timeout:      session_params[:pool_timeout] || 60,
    close_proc:   nil,
    logger:       logger
  ) do
    session                      = connection.create_session(session_params)
    # Turn on Java class persistence: https://github.com/jruby/jruby/wiki/Persistence
    session.class.__persistent__ = true
    session
  end

  # Handle connection failures
  connection.on_exception do |jms_exception|
    logger.error "JMS Connection Exception has occurred: #{jms_exception.inspect}"
  end
end

Instance Method Details

#closeObject

Immediately Close all sessions in the pool and release from the pool

Note: This is an immediate close, active sessions will be aborted

Note: Once closed a session pool cannot be re-used. A new instance must

be created


156
157
158
# File 'lib/jms/session_pool.rb', line 156

def close
  @pool.each { |s| s.close }
end

#consumer(params, &block) ⇒ Object

Obtain a session from the pool and create a MessageConsumer. Pass both into the supplied block. Once the block is complete the consumer is closed and the session is returned to the pool.

Parameters:

queue_name: [String] Name of the Queue to return
            [Symbol] Create temporary queue
               Mandatory unless :topic_name is supplied
  Or,
topic_name: [String] Name of the Topic to write to or subscribe to
            [Symbol] Create temporary topic
               Mandatory unless :queue_name is supplied
  Or,
destination: [javaxJms::Destination] Destination to use

selector:   Filter which messages should be returned from the queue
               Default: All messages
no_local:   Determine whether messages published by its own connection
               should be delivered to it
               Default: false

Example

session_pool.consumer(queue_name: 'MyQueue') do |session, consumer|
  message = consumer.receive(timeout)
  puts message.data if message
end


108
109
110
111
112
113
114
115
116
117
# File 'lib/jms/session_pool.rb', line 108

def consumer(params, &block)
  session do |s|
    begin
      consumer = s.consumer(params)
      block.call(s, consumer)
    ensure
      consumer.close if consumer
    end
  end
end

#producer(params, &block) ⇒ Object

Obtain a session from the pool and create a MessageProducer. Pass both into the supplied block. Once the block is complete the producer is closed and the session is returned to the pool.

Parameters:

queue_name: [String] Name of the Queue to return
            [Symbol] Create temporary queue
               Mandatory unless :topic_name is supplied
  Or,
topic_name: [String] Name of the Topic to write to or subscribe to
            [Symbol] Create temporary topic
               Mandatory unless :queue_name is supplied
  Or,
destination: [javaxJms::Destination] Destination to use

Example

session_pool.producer(queue_name: 'ExampleQueue') do |session, producer|
  producer.send(session.message("Hello World"))
end


139
140
141
142
143
144
145
146
147
148
# File 'lib/jms/session_pool.rb', line 139

def producer(params, &block)
  session do |s|
    begin
      producer = s.producer(params)
      block.call(s, producer)
    ensure
      producer.close if producer
    end
  end
end

#session(&block) ⇒ Object

Obtain a session from the pool and pass it to the supplied block The session is automatically returned to the pool once the block completes

In the event a JMS Exception is thrown the session will be closed and removed from the pool to prevent re-using sessions that are no longer valid



66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/jms/session_pool.rb', line 66

def session(&block)
  s = nil
  begin
    s = @pool.checkout
    block.call(s)
  rescue javax.jms.JMSException => e
    s.close rescue nil
    @pool.remove(s)
    s = nil # Do not check back in since we have removed it
    raise e
  ensure
    @pool.checkin(s) if s
  end
end