Class: TorqueBox::Stomp::JmsStomplet

Inherits:
Object
  • Object
show all
Defined in:
lib/torquebox/stomp/jms_stomplet.rb

Defined Under Namespace

Classes: MessageListener

Instance Method Summary collapse

Constructor Details

#initializeJmsStomplet

Returns a new instance of JmsStomplet.



22
23
24
25
# File 'lib/torquebox/stomp/jms_stomplet.rb', line 22

def initialize()
  @connection_factory = TorqueBox.fetch( 'xa-connection-factory' )
  @subscriptions = {}
end

Instance Method Details

#configure(stomplet_config) ⇒ Object

def xa_resources

@xa_resources

end



31
32
33
34
35
36
# File 'lib/torquebox/stomp/jms_stomplet.rb', line 31

def configure(stomplet_config)
  @connection = @connection_factory.create_xa_connection
  @connection.start
  @session = @connection.create_session( false )
  @xa_resources = [ @session.xa_resource ]
end

#destination_for(name, type) ⇒ Object



60
61
62
63
# File 'lib/torquebox/stomp/jms_stomplet.rb', line 60

def destination_for(name, type)
  return queue(name) if ( type.to_sym == :queue )
  topic(name)
end

#destroyObject



38
39
40
41
# File 'lib/torquebox/stomp/jms_stomplet.rb', line 38

def destroy
  @session.close
  @connection.close
end

#on_unsubscribe(subscriber) ⇒ Object



43
44
45
46
47
48
# File 'lib/torquebox/stomp/jms_stomplet.rb', line 43

def on_unsubscribe(subscriber)
  subscriptions = @subscriptions.delete( subscriber )
  (subscriptions || []).each do |subscription|
    subscription.close
  end
end

#queue(name) ⇒ Object



50
51
52
53
# File 'lib/torquebox/stomp/jms_stomplet.rb', line 50

def queue(name)
  jms_session = @session.jms_session
  TorqueBox::Messaging::Queue.new( jms_session.create_queue( name ) )
end

#send_to(destination, stomp_message, headers = {}, options = {}) ⇒ Object



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/torquebox/stomp/jms_stomplet.rb', line 74

def send_to(destination, stomp_message, headers={}, options={})
  jms_session = @session.jms_session
  java_destination = @session.java_destination( destination )

  producer    = @session.jms_session.create_producer( java_destination.to_java )

  case ( stomp_message )
  when org.projectodd.stilts.stomp::StompMessage
    content = stomp_message.content_as_string
  else
    content = stomp_message
  end

  encoded_message = TorqueBox::Messaging::Message.new( @session.jms_session, content, options[:encoding] )
  jms_message = encoded_message.jms_message

  if ( stomp_message.is_a?( org.projectodd.stilts.stomp::StompMessage ) )
    stomp_message.headers.header_names.each do |name|
      jms_name = name.to_s.gsub( /-/, '_' )
      header_value = stomp_message.headers[ name.to_s ]
      jms_message.setStringProperty( jms_name, header_value )
    end
  end

  headers.each do |name, header_value|
    jms_name = name.to_s.gsub( /-/, '_' )
    jms_message.setStringProperty( jms_name, header_value.to_s )
  end

  producer.send( jms_message )
end

#subscribe_to(subscriber, destination, selector = nil, options = {}) ⇒ Object



65
66
67
68
69
70
71
72
# File 'lib/torquebox/stomp/jms_stomplet.rb', line 65

def subscribe_to(subscriber, destination, selector=nil, options={})
  jms_session = @session.jms_session
  java_destination = @session.java_destination( destination )
  consumer = @session.jms_session.create_consumer( java_destination.to_java, selector )
  consumer.message_listener = MessageListener.new( subscriber, options )
  @subscriptions[ subscriber ] ||= []
  @subscriptions[ subscriber ] << consumer
end

#topic(name) ⇒ Object



55
56
57
58
# File 'lib/torquebox/stomp/jms_stomplet.rb', line 55

def topic(name)
  jms_session = @session.jms_session
  TorqueBox::Messaging::Topic.new( jms_session.create_topic( name ) )
end