Class: TorqueBox::Messaging::Session

Inherits:
Object
  • Object
show all
Defined in:
lib/torquebox/messaging/session.rb

Direct Known Subclasses

TransactedSession, XaSession

Constant Summary collapse

AUTO_ACK =
javax.jms::Session::AUTO_ACKNOWLEDGE
CLIENT_ACK =
javax.jms::Session::CLIENT_ACKNOWLEDGE
DUPS_OK_ACK =
javax.jms::Session::DUPS_OK_ACKNOWLEDGE

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(jms_session) ⇒ Session

Returns a new instance of Session.



29
30
31
# File 'lib/torquebox/messaging/session.rb', line 29

def initialize(jms_session)
  @jms_session = jms_session
end

Instance Attribute Details

#jms_sessionObject

Returns the value of attribute jms_session.



27
28
29
# File 'lib/torquebox/messaging/session.rb', line 27

def jms_session
  @jms_session
end

Class Method Details

.canonical_ack_mode(ack_mode) ⇒ Object



154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/torquebox/messaging/session.rb', line 154

def self.canonical_ack_mode(ack_mode)
  case ( ack_mode )
    when Fixnum
      return ack_mode
    when :auto
      return AUTO_ACK
    when :client
      return CLIENT_ACK
    when :dups_ok
      return DUPS_OK_ACK
  end
end

Instance Method Details

#closeObject



33
34
35
# File 'lib/torquebox/messaging/session.rb', line 33

def close
  @jms_session.close
end

#create_browser(*args) ⇒ Object



139
140
141
# File 'lib/torquebox/messaging/session.rb', line 139

def create_browser(*args)
  jms_session.create_browser( *args )
end

#java_destination(destination) ⇒ Object



143
144
145
146
147
148
149
150
151
152
# File 'lib/torquebox/messaging/session.rb', line 143

def java_destination(destination)
  java_destination = destination.name
  
  unless java_destination.is_a?( javax.jms.Destination )
    meth = destination.is_a?( Queue ) ? :create_queue : :create_topic
    java_destination = @jms_session.send( meth, java_destination )
  end
  
  java_destination
end

#publish(destination, payload, options = {}) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/torquebox/messaging/session.rb', line 41

def publish(destination, payload, options={})
  producer    = @jms_session.create_producer( java_destination( destination ) )
  message     = Message.new( @jms_session, payload, options[:encoding] )

  message.populate_message_headers(options)
  message.populate_message_properties(options[:properties])

  producer.send( message.jms_message,
                 options.fetch(:delivery_mode, producer.delivery_mode),
                 options.fetch(:priority, producer.priority),
                 options.fetch(:ttl, producer.time_to_live) )
  message
end

#publish_and_receive(destination, message, options = {}) ⇒ Object

Implement the request-response pattern. Sends a message to the request destination and waits for a reply on the response destination.

Options:

:timeout - specifies the time in miliseconds to wait for answer,

default: 10000 (10s)

:decode - pass false to return the original JMS TextMessage,

default: true


94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/torquebox/messaging/session.rb', line 94

def publish_and_receive(destination, message, options = {})
  options[:timeout] ||= 10000 # 10s
  decode = options.fetch(:decode, false)
  options[:properties] ||= {}
  options[:properties]["synchronous"] = "true"
  wrapped_message = { :timeout => options[:timeout], :message => message }
  message = publish(destination, wrapped_message, options)
    
  options[:selector] = "JMSCorrelationID='#{message.jms_message.jms_message_id}'"
  response = receive(destination, options)
    
  if response
    decode ? Message.new( response ).decode : response
  end
end

#queue_for(name) ⇒ Object



37
38
39
# File 'lib/torquebox/messaging/session.rb', line 37

def queue_for(name)
  Queue.new( @jms_session.create_queue( name ) )
end

#receive(destination, options = {}) ⇒ Object

Returns decoded message, by default. Pass :decode=>false to return the original JMS TextMessage. Pass :timeout to give up after a number of milliseconds



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/torquebox/messaging/session.rb', line 58

def receive(destination, options={})
  decode = options.fetch(:decode, true)
  timeout = options.fetch(:timeout, 0)
  selector = options[:selector]
  
  java_destination = java_destination( destination )
  if options[:durable] && java_destination.class.name =~ /Topic/
    consumer = @jms_session.createDurableSubscriber( java_destination,
                                                     options.fetch(:subscriber_name, Topic::DEFAULT_SUBSCRIBER_NAME),
                                                     selector,
                                                     false )
  else
    consumer = @jms_session.createConsumer( java_destination, selector )      
  end
  begin
    jms_message = consumer.receive( timeout )
    if jms_message
      message = decode ? Message.new( jms_message ).decode : jms_message
      block_given? ? yield(message) : message
    end
  ensure
    consumer.close unless consumer.nil?
  end
end

#receive_and_publish(destination, options = {}) ⇒ Object

Receiving end of the request-response pattern. The return value of the block passed to this method is the response sent back to the client. If no block is given then request is returned as the response.



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/torquebox/messaging/session.rb', line 114

def receive_and_publish(destination, options = {})
  selector = "synchronous = 'true'"
  selector = "#{selector} and (#{options[:selector]})" if options[:selector]
  receive_options = options.merge(:decode => false,
                                  :selector => selector)
    
  request = receive(destination, receive_options)
  unless request.nil?
    decoded_request = Message.new( request ).decode
    request_message = decoded_request[:message]
    # Base the response ttl off the original request timeout
    request_timeout = decoded_request[:timeout]
    options[:ttl] ||= request_timeout
    
    response = block_given? ? yield(request_message) : request_message
    
    options[:correlation_id] = request.jms_message_id
    publish(destination, response, options)
  end
end

#unsubscribe(subscriber_name = Topic::DEFAULT_SUBSCRIBER_NAME) ⇒ Object



135
136
137
# File 'lib/torquebox/messaging/session.rb', line 135

def unsubscribe(subscriber_name = Topic::DEFAULT_SUBSCRIBER_NAME)
  @jms_session.unsubscribe( subscriber_name )
end