Class: TorqueBox::Stomp::JmsStomplet
- Inherits:
-
Object
- Object
- TorqueBox::Stomp::JmsStomplet
- Defined in:
- lib/torquebox/stomp/jms_stomplet.rb
Defined Under Namespace
Classes: MessageListener
Instance Method Summary collapse
-
#configure(stomplet_config) ⇒ Object
def xa_resources @xa_resources end.
- #destination_for(name, type) ⇒ Object
- #destroy ⇒ Object
-
#initialize ⇒ JmsStomplet
constructor
A new instance of JmsStomplet.
- #on_unsubscribe(subscriber) ⇒ Object
- #queue(name) ⇒ Object
- #send_to(destination, stomp_message, headers = {}, options = {}) ⇒ Object
- #subscribe_to(subscriber, destination, selector = nil, options = {}) ⇒ Object
- #topic(name) ⇒ Object
Constructor Details
#initialize ⇒ JmsStomplet
Returns a new instance of JmsStomplet.
22 23 24 25 |
# File 'lib/torquebox/stomp/jms_stomplet.rb', line 22 def initialize() @connection_factory = TorqueBox.fetch( 'xa-connection-factory' ) @subscriptions = {} end |
Instance Method Details
#configure(stomplet_config) ⇒ Object
def xa_resources
@xa_resources
end
31 32 33 34 35 36 |
# File 'lib/torquebox/stomp/jms_stomplet.rb', line 31 def configure(stomplet_config) @connection = @connection_factory.create_xa_connection @connection.start @session = @connection.create_session( false ) @xa_resources = [ @session.xa_resource ] end |
#destination_for(name, type) ⇒ Object
60 61 62 63 |
# File 'lib/torquebox/stomp/jms_stomplet.rb', line 60 def destination_for(name, type) return queue(name) if ( type.to_sym == :queue ) topic(name) end |
#destroy ⇒ Object
38 39 40 41 |
# File 'lib/torquebox/stomp/jms_stomplet.rb', line 38 def destroy @session.close @connection.close end |
#on_unsubscribe(subscriber) ⇒ Object
43 44 45 46 47 48 |
# File 'lib/torquebox/stomp/jms_stomplet.rb', line 43 def on_unsubscribe(subscriber) subscriptions = @subscriptions.delete( subscriber ) (subscriptions || []).each do |subscription| subscription.close end end |
#queue(name) ⇒ Object
50 51 52 53 |
# File 'lib/torquebox/stomp/jms_stomplet.rb', line 50 def queue(name) jms_session = @session.jms_session TorqueBox::Messaging::Queue.new( jms_session.create_queue( name ) ) end |
#send_to(destination, stomp_message, headers = {}, options = {}) ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/torquebox/stomp/jms_stomplet.rb', line 74 def send_to(destination, , headers={}, ={}) jms_session = @session.jms_session java_destination = @session.java_destination( destination ) producer = @session.jms_session.create_producer( java_destination.to_java ) case ( ) when org.projectodd.stilts.stomp::StompMessage content = .content_as_string else content = end = TorqueBox::Messaging::Message.new( @session.jms_session, content, [:encoding] ) = . if ( .is_a?( org.projectodd.stilts.stomp::StompMessage ) ) .headers.header_names.each do |name| jms_name = name.to_s.gsub( /-/, '_' ) header_value = .headers[ name.to_s ] .setStringProperty( jms_name, header_value ) end end headers.each do |name, header_value| jms_name = name.to_s.gsub( /-/, '_' ) .setStringProperty( jms_name, header_value.to_s ) end producer.send( ) end |
#subscribe_to(subscriber, destination, selector = nil, options = {}) ⇒ Object
65 66 67 68 69 70 71 72 |
# File 'lib/torquebox/stomp/jms_stomplet.rb', line 65 def subscribe_to(subscriber, destination, selector=nil, ={}) jms_session = @session.jms_session java_destination = @session.java_destination( destination ) consumer = @session.jms_session.create_consumer( java_destination.to_java, selector ) consumer. = MessageListener.new( subscriber, ) @subscriptions[ subscriber ] ||= [] @subscriptions[ subscriber ] << consumer end |
#topic(name) ⇒ Object
55 56 57 58 |
# File 'lib/torquebox/stomp/jms_stomplet.rb', line 55 def topic(name) jms_session = @session.jms_session TorqueBox::Messaging::Topic.new( jms_session.create_topic( name ) ) end |