Class: ActiveMessaging::Adapters::Jms::Connection

Inherits:
Object
  • Object
show all
Includes:
ActiveMessaging::Adapter
Defined in:
lib/activemessaging/adapters/jms.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from ActiveMessaging::Adapter

included, #logger

Constructor Details

#initialize(cfg = {}) ⇒ Connection

Returns a new instance of Connection.



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/activemessaging/adapters/jms.rb', line 23

def initialize cfg={}
  @url = cfg[:url]
  @login = cfg[:login]
  @passcode = cfg[:passcode]
  #initialize our connection factory
  if cfg.has_key? :connection_factory
    #this initialize is probably activemq specific. There might be a more generic
    #way of getting this without resorting to jndi lookup.
    eval <<-end_eval
      @connection_factory = Java::#{cfg[:connection_factory]}.new(@login, @passcode, @url)
    end_eval
  elsif cfg.has_key? :jndi
    @connection_factory = javax.naming.InitialContext.new().lookup(cfg[:jndi])
  else
    raise "Either jndi or connection_factory has to be set in the config."
  end
  raise "Connection factory could not be initialized." if @connection_factory.nil?
  
  @connection = @connection_factory.create_connection()
  @session = @connection.createSession(false, 1)
  @destinations = []
  @producers = {}
  @consumers = {}
  @connection.start
end

Instance Attribute Details

#connectionObject

Returns the value of attribute connection.



21
22
23
# File 'lib/activemessaging/adapters/jms.rb', line 21

def connection
  @connection
end

#consumersObject

Returns the value of attribute consumers.



21
22
23
# File 'lib/activemessaging/adapters/jms.rb', line 21

def consumers
  @consumers
end

#producersObject

Returns the value of attribute producers.



21
22
23
# File 'lib/activemessaging/adapters/jms.rb', line 21

def producers
  @producers
end

#reliableObject

Returns the value of attribute reliable.



21
22
23
# File 'lib/activemessaging/adapters/jms.rb', line 21

def reliable
  @reliable
end

#sessionObject

Returns the value of attribute session.



21
22
23
# File 'lib/activemessaging/adapters/jms.rb', line 21

def session
  @session
end

Instance Method Details

#closeObject



117
118
119
120
121
122
123
124
125
126
# File 'lib/activemessaging/adapters/jms.rb', line 117

def close
  @consumers.each {|k, c| c.stop }
  @connection.stop
  @session.close
  @connection.close
  @connection = nil
  @session = nil
  @consumers = {}
  @producers = {}
end

#find_or_create_consumer(queue_name, headers = {}) ⇒ Object



137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/activemessaging/adapters/jms.rb', line 137

def find_or_create_consumer queue_name, headers={}
  consumer = @consumers[queue_name]
  if consumer.nil?
    destination = find_or_create_destination queue_name, headers
    if headers.symbolize_keys.has_key? :selector
      consumer = @session.create_consumer destination, headers.symbolize_keys[:selector]
    else
      consumer = @session.create_consumer destination
    end
    
    @consumers[queue_name] = consumer
  end
  consumer
end

#find_or_create_destination(queue_name, headers = {}) ⇒ Object



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/activemessaging/adapters/jms.rb', line 152

def find_or_create_destination queue_name, headers={}
  destination = find_destination queue_name, headers[:destination_type]
  if destination.nil?
    if headers.symbolize_keys[:destination_type] == :topic
      destination = @session.create_topic(queue_name.to_s)
      @destinations << destination
    elsif headers.symbolize_keys[:destination_type] == :queue
      destination = @session.create_queue(queue_name.to_s)
      @destinations << destination
    else
      raise "headers[:destination_type] must be either :queue or :topic.  was #{headers[:destination_type]}"
    end
  end
  destination
end

#find_or_create_producer(queue_name, headers = {}) ⇒ Object



128
129
130
131
132
133
134
135
# File 'lib/activemessaging/adapters/jms.rb', line 128

def find_or_create_producer queue_name, headers={}
  producer = @producers[queue_name]
  if producer.nil?
    destination = find_or_create_destination queue_name, headers
    producer = @session.create_producer destination
  end
  producer
end

#receive(options = {}) ⇒ Object



89
90
91
92
93
# File 'lib/activemessaging/adapters/jms.rb', line 89

def receive(options={})
  queue_name = options[:queue_name]
  headers = options[:headers] || {}
  receive_message(queue_name, headers)
end

#receive_message(queue_name = nil, headers = {}) ⇒ Object



95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/activemessaging/adapters/jms.rb', line 95

def receive_message(queue_name=nil, headers={})
  if queue_name.nil?
    @consumers.find do |k, c|
      message = c.receive(1)
      return condition_message(message) unless message.nil?
    end
  else
    consumer = subscribe(queue_name, headers)
    message = consumer.receive(1)
    unsubscribe(queue_name, headers)
    condition_message(message)
  end
end

#received(message, headers = {}) ⇒ Object



109
110
111
# File 'lib/activemessaging/adapters/jms.rb', line 109

def received message, headers={}
  #do nothing
end

#send(queue_name, body, headers = {}) ⇒ Object



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/activemessaging/adapters/jms.rb', line 63

def send queue_name, body, headers={}
  queue_name = check_destination_type queue_name, headers
  producer = find_or_create_producer queue_name, headers.symbolize_keys
  message = @session.create_text_message body
  headers.stringify_keys.each do |key, value|
    if ['id', 'message-id', 'JMSMessageID'].include? key
      message.setJMSMessageID value.to_s
    elsif ['correlation-id', 'JMSCorrelationID'].include? key
      message.setJMSCorrelationID value.to_s
    elsif ['expires', 'JMSExpiration'].include? key
      message.setJMSExpiration value.to_i
    elsif ['persistent', 'JMSDeliveryMode'].include? key
      message.setJMSDeliveryMode(value ? 2 : 1)
    elsif ['priority', 'JMSPriority'].include? key
      message.setJMSPriority value.to_i
    elsif ['reply-to', 'JMSReplyTo'].include? key
      message.setJMSReplyTo value.to_s
    elsif ['type', 'JMSType'].include? key
      message.setJMSType value.to_s
    else #is this the most appropriate thing to do here?
      message.set_string_property key, value.to_s
    end
  end
  producer.send message
end

#subscribe(queue_name, headers = {}) ⇒ Object



49
50
51
52
# File 'lib/activemessaging/adapters/jms.rb', line 49

def subscribe queue_name, headers={}
  queue_name = check_destination_type queue_name, headers
  find_or_create_consumer queue_name, headers
end

#unreceive(message, headers = {}) ⇒ Object



113
114
115
# File 'lib/activemessaging/adapters/jms.rb', line 113

def unreceive message, headers={}
  # do nothing
end

#unsubscribe(queue_name, headers = {}) ⇒ Object



54
55
56
57
58
59
60
61
# File 'lib/activemessaging/adapters/jms.rb', line 54

def unsubscribe queue_name, headers={}
  queue_name = check_destination_type queue_name, headers
  consumer = @consumers[queue_name]
  unless consumer.nil?
    consumer.close
    @consumers.delete queue_name
  end
end