Class: TorqueBox::Messaging::Session

Inherits:
Object
  • Object
show all
Defined in:
lib/torquebox/messaging/session.rb

Direct Known Subclasses

XaSession

Constant Summary collapse

AUTO_ACK =
javax.jms::Session::AUTO_ACKNOWLEDGE
CLIENT_ACK =
javax.jms::Session::CLIENT_ACKNOWLEDGE
DUPS_OK_ACK =
javax.jms::Session::DUPS_OK_ACKNOWLEDGE
SESSION_TRANSACTED =
javax.jms::Session::SESSION_TRANSACTED

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(jms_session) ⇒ Session

Returns a new instance of Session.



30
31
32
# File 'lib/torquebox/messaging/session.rb', line 30

def initialize(jms_session)
  @jms_session = jms_session
end

Instance Attribute Details

#jms_sessionObject

Returns the value of attribute jms_session.



28
29
30
# File 'lib/torquebox/messaging/session.rb', line 28

def jms_session
  @jms_session
end

Class Method Details

.canonical_ack_mode(ack_mode) ⇒ Object



158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/torquebox/messaging/session.rb', line 158

def self.canonical_ack_mode(ack_mode)
  case ( ack_mode )
    when Fixnum
      return ack_mode
    when :auto
      return AUTO_ACK
    when :client
      return CLIENT_ACK
    when :dups_ok
      return DUPS_OK_ACK
  end
end

Instance Method Details

#closeObject



34
35
36
# File 'lib/torquebox/messaging/session.rb', line 34

def close
  @jms_session.close
end

#create_browser(*args) ⇒ Object



148
149
150
# File 'lib/torquebox/messaging/session.rb', line 148

def create_browser(*args)
  jms_session.create_browser( *args )
end

#java_destination(destination) ⇒ Object



152
153
154
155
156
# File 'lib/torquebox/messaging/session.rb', line 152

def java_destination(destination)
  return destination.java_destination unless destination.java_destination.nil?
  meth = destination.is_a?( Queue ) ? :create_queue : :create_topic
  @jms_session.send( meth, destination.name )
end

#publish(destination, payload, options = {}) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/torquebox/messaging/session.rb', line 42

def publish(destination, payload, options={})
  producer    = @jms_session.create_producer( java_destination( destination ) )
  message     = Message.new( @jms_session, payload, options[:encoding] )

  options[:properties] ||= {}

  # This will let us create messages to be scheduled later like this:
  #
  # queue.publish(:ham => :biscuit, :scheduled => Time.now + 10)
  # queue.publish(:ham => :biscuit, :scheduled => Time.now + 2.5)
  #
  # In Rails it is possible to do:
  #
  # queue.publish(:ham => :biscuit, :scheduled => 3.minutes.from_now)
  #
  # Please note that the :scheduled parameter takes a Time object.
  if options.has_key?(:scheduled)
    options[:properties][Java::org.hornetq.api.core.Message::HDR_SCHEDULED_DELIVERY_TIME.to_s] = (options[:scheduled].to_f * 1000).to_i
  end

  message.populate_message_headers(options)
  message.populate_message_properties(options[:properties])

  producer.send( message.jms_message,
                 options.fetch(:delivery_mode, producer.delivery_mode),
                 options.fetch(:priority, producer.priority),
                 options.fetch(:ttl, producer.time_to_live) )
  message
end

#publish_and_receive(destination, message, options = {}) ⇒ Object

Implement the request-response pattern. Sends a message to the request destination and waits for a reply on the response destination.

Options:

:timeout - specifies the time in miliseconds to wait for answer,

default: 10000 (10s)

:decode - pass false to return the original JMS TextMessage,

default: true


111
112
113
114
115
116
117
118
119
# File 'lib/torquebox/messaging/session.rb', line 111

def publish_and_receive(destination, message, options = {})
  options[:timeout] ||= 10_000 # 10s
  options[:properties] ||= {}
  options[:properties]["synchronous"] = "true"
  message = publish(destination, message, options)
    
  options[:selector] = "JMSCorrelationID='#{message.jms_message.jms_message_id}'"
  receive(destination, options)
end

#queue_for(name) ⇒ Object



38
39
40
# File 'lib/torquebox/messaging/session.rb', line 38

def queue_for(name)
  Queue.new( @jms_session.create_queue( name ) )
end

#receive(destination, options = {}) ⇒ Object

Returns decoded message, by default. Pass :decode=>false to return the original JMS TextMessage. Pass :timeout to give up after a number of milliseconds



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/torquebox/messaging/session.rb', line 75

def receive(destination, options={})
  decode = options.fetch(:decode, true)
  timeout = options.fetch(:timeout, 0)
  selector = options[:selector]
  
  java_destination = java_destination( destination )
  if options[:durable] && java_destination.class.name =~ /Topic/
    consumer = @jms_session.createDurableSubscriber( java_destination,
                                                     options.fetch(:subscriber_name, Topic::DEFAULT_SUBSCRIBER_NAME),
                                                     selector,
                                                     false )
  else
    consumer = @jms_session.createConsumer( java_destination, selector )      
  end
  begin
    jms_message = consumer.receive( timeout )
    if jms_message
      message = decode ? Message.new( jms_message ).decode : jms_message
      block_given? ? yield(message) : message
    end
  ensure
    consumer.close unless consumer.nil?
  end
end

#receive_and_publish(destination, options = {}) ⇒ Object

Receiving end of the request-response pattern. The return value of the block passed to this method is the response sent back to the client. If no block is given then request is returned as the response.



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/torquebox/messaging/session.rb', line 125

def receive_and_publish(destination, options = {})
  selector = "synchronous = 'true'"
  selector = "#{selector} and (#{options[:selector]})" if options[:selector]
  receive_options = options.merge(:decode => false,
                                  :selector => selector)
    
  request = receive(destination, receive_options)
  unless request.nil?
    request_message = Message.new( request ).decode
    options[:ttl] ||= 60_000 # 1m
    options[:encoding] ||= Message.extract_encoding_from_message( request )

    response = block_given? ? yield(request_message) : request_message
    
    options[:correlation_id] = request.jms_message_id
    publish(destination, response, options)
  end
end

#unsubscribe(subscriber_name = Topic::DEFAULT_SUBSCRIBER_NAME) ⇒ Object



144
145
146
# File 'lib/torquebox/messaging/session.rb', line 144

def unsubscribe(subscriber_name = Topic::DEFAULT_SUBSCRIBER_NAME)
  @jms_session.unsubscribe( subscriber_name )
end