Class: StompMessage::StompSendTopic

Inherits:
Object
  • Object
show all
Includes:
JmsTools
Defined in:
lib/stomp_message/stomp_send_topic.rb

Overview

this class manages sending and receiving messages. It uses passed code block to execute received code

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from JmsTools

#define_source_id, #jms_close_producer_session, #jms_create_consumer, #jms_create_consumer_session, #jms_create_destination_connection, #jms_create_producer, #jms_create_producer_session, #jms_create_session, #jms_kill_logging, #jms_manage_headers, #jms_message_for_me, #jms_my_id, #jms_next_transaction_id, #jms_on_message, #jms_persistent, #jms_send_ack, #jms_send_message, #jms_set_debug, #jms_shutdown, #jms_start, #unique_source_id

Constructor Details

#initialize(options = {}) ⇒ StompSendTopic

Returns a new instance of StompSendTopic.



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/stomp_message/stomp_send_topic.rb', line 20

def initialize(options={})
 # set up variables using hash
  @close_ok=false
   init_vars(options)
       if RUBY_PLATFORM =~ /java/
           @javaflag= @java_flag=true
        else
          @javaflag= @java_flag=false
        end
    set_up_jms(options[:topic]) if @javaflag
#  puts "host is: #{host} port is #{port}"
# using url as flag wrong
#  self.conn = Stomp::Client.new(self.login, self.password, self.host, self.port, false) if self.url ==nil
  @debug=false
  
  puts "#{self.class}: Initialized host is: #{self.host} port is #{self.port} topic is #{self.topic} login #{self.} pass: #{self.password}" if @debug
  # scott old   self.conn.subscribe( self.topic, { :ack =>"auto" })  { |m| 
 #    self.conn.subscribe( self.topic, { :ack =>"client" })  { |m| 
#                                 # puts "#{self.class} msg: #{m.to_s}"
#                                   }
 # setup_auto_close
end

Instance Attribute Details

#connObject

Returns the value of attribute conn.



17
18
19
# File 'lib/stomp_message/stomp_send_topic.rb', line 17

def conn
  @conn
end

#hostObject

Returns the value of attribute host.



17
18
19
# File 'lib/stomp_message/stomp_send_topic.rb', line 17

def host
  @host
end

#loginObject

Returns the value of attribute login.



17
18
19
# File 'lib/stomp_message/stomp_send_topic.rb', line 17

def 
  @login
end

#passwordObject

Returns the value of attribute password.



17
18
19
# File 'lib/stomp_message/stomp_send_topic.rb', line 17

def password
  @password
end

#portObject

Returns the value of attribute port.



17
18
19
# File 'lib/stomp_message/stomp_send_topic.rb', line 17

def port
  @port
end

#topicObject

Returns the value of attribute topic.



17
18
19
# File 'lib/stomp_message/stomp_send_topic.rb', line 17

def topic
  @topic
end

#urlObject

Returns the value of attribute url.



17
18
19
# File 'lib/stomp_message/stomp_send_topic.rb', line 17

def url
  @url
end

Class Method Details

.open_connection(old_conn, login, pass, host, port) ⇒ Object

manage timeout etc…



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/stomp_message/stomp_send_topic.rb', line 93

def self.open_connection(old_conn,,pass,host,port)
    conn=old_conn
    count=0
    flag= false
   begin
    conn.close if conn!=nil
    count+=1
    Timeout::timeout(15) {
         conn=nil
         conn = Stomp::Client.new(, pass, host, port, false)
         flag=true
     }
 
     rescue Timeout::Error
      puts "Timeout error: exception retrying flag is: #{flag} retry number #{count}"
      retry  if !flag and count < 4
   #   raise "timeout" 
  
     end 
      raise 'connection not established'  if conn==nil        
      conn
end

.send_email(from, to, msg) ⇒ Object



278
279
280
281
282
283
284
285
286
287
288
289
290
291
# File 'lib/stomp_message/stomp_send_topic.rb', line 278

def  self.send_email(from,to,msg)
      case   Socket.gethostname
      when "svbalance.cure.com.ph"
            smtp_host='mail2.cure.com.ph'
      when "Scotts-Computer.local"
        smtp_host='mail2.cure.com.ph'
      else
        smtp_host='localhost'
      end
      
    	Net::SMTP.start(smtp_host) { |smtp|
    		smtp.send_message(msg, from, to)
  	   }
end

.send_email_stomp(from, from_alias, to, subject, message) ⇒ Object



263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
# File 'lib/stomp_message/stomp_send_topic.rb', line 263

def self.send_email_stomp(from, from_alias, to, subject, message)
      #to_csv_array=to.each.join(",")
    	msg = <<EOF__RUBY_END_OF_MESSAGE
From: #{from_alias} <#{from}>
To: #{to}
Subject: #{subject} #{Time.now}
Comand ----------
#{subject}
RESPONSE -----------
#{message}
EOF__RUBY_END_OF_MESSAGE
     recipients=to.split(',')
     recipients.each {|r|  StompMessage::StompSendTopic.send_email(from,r,msg)}
    
end

Instance Method Details

#close_topicObject

close the topic



119
120
121
122
# File 'lib/stomp_message/stomp_send_topic.rb', line 119

def close_topic
    self.conn.unsubscribe(self.topic) if self.conn !=nil && !self.java?
  #  self.conn=nil 
end

#create_consumerObject



208
209
210
# File 'lib/stomp_message/stomp_send_topic.rb', line 208

def create_consumer
   @consumer, @consumer_session = jms_create_consumer_session(@jms_dest,@jms_conn) 
end

#disconnect_stompObject

disconnect the connection



124
125
126
127
128
129
130
131
132
133
# File 'lib/stomp_message/stomp_send_topic.rb', line 124

def disconnect_stomp
    if !self.java?
       close_topic
       puts "#{self.class}  closing connection #{@jms_conn.inspect}" if @debug
      @jms_conn.close() if @jms_conn !=nil   
    else
      @jms_conn.close() if @jms_conn !=nil   
      put "INVESTIGATE THIS diconnect stomp if java in stomp_send_topic.rb"
    end
end

#init_vars(options) ⇒ Object



69
70
71
72
73
74
75
76
77
78
# File 'lib/stomp_message/stomp_send_topic.rb', line 69

def init_vars(options)
   self. =   options[:login]==nil ? '' : options[:login]
    self.url =   options[:url]== nil ? nil : options[:url]
      self.password =   options[:password]==nil ? '' : options[:password]
        self.host =   options[:host]==nil ? 'localhost' : options[:host]
    self.port =   options[:port]==nil ? '61613' : options[:port]
    self.topic = options[:topic]==nil ? '/topic/undefined' : options[:topic]  
    self.conn=nil
    puts "self url is #{self.url}" if @debug
end

#interim_package(msg, headers, timeout) ⇒ Object

send_sms



180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/stomp_message/stomp_send_topic.rb', line 180

def interim_package(msg,headers,timeout)
  result=false
  msg_received_flag =false
  begin
  Timeout::timeout(timeout+1) {
    self.send_topic_acknowledge(msg,headers,timeout-1)  {   |msg| # puts 'in handle action block' 
                        #  puts "MESSAGE RECEIVED ---- #{msg.to_s} "
                          msg_received_flag=true
                          m=StompMessage::Message.load_xml(msg)
                          result=m.body
                       #    result= yield m.body if block_given? FIGURE OUT HOW TO MAKE THIS WORK
                        #  puts "result is #{result}"
                          result
                           }
                
                           while true  
                          #    putc '.'
                              break if msg_received_flag
                              sleep(1)
                              end  }
                        rescue SystemExit
                        rescue Timeout::Error
                        rescue Exception => e
                         puts "exception #{e.message} class: #{e.class}"
                         puts  "no receipt"
                        end
             result
end

#java?Boolean

Returns:

  • (Boolean)


79
80
81
# File 'lib/stomp_message/stomp_send_topic.rb', line 79

def java?
  @javaflag
end

#jms_message_handling(tjms_dest, tjms_conn) ⇒ Object



157
158
159
160
161
162
163
164
165
166
# File 'lib/stomp_message/stomp_send_topic.rb', line 157

def jms_message_handling(tjms_dest,tjms_conn)
   @my_conn=@jms_conn
     @producer, @session  = self.jms_create_producer_session(tjms_dest,tjms_conn)
   tconsumer=jms_create_consumer(@session,@jms_dest)
   #@producer, @session  = self.jms_create_producer_session(tjms_dest,tjms_conn)
   result = yield
   tconsumer.close
   jms_close_producer_session(  @producer, @session )
   result
end

#jms_message_one_way(tjms_dest, tjms_conn) ⇒ Object



147
148
149
150
151
152
153
154
155
156
# File 'lib/stomp_message/stomp_send_topic.rb', line 147

def jms_message_one_way(tjms_dest,tjms_conn)
  # @my_conn=@jms_conn
     @producer, @session  = self.jms_create_producer_session(tjms_dest,tjms_conn)
#   tconsumer=jms_create_consumer(@session,@jms_dest)
   #@producer, @session  = self.jms_create_producer_session(tjms_dest,tjms_conn)
   ttresult = yield
 #  tconsumer.close
   jms_close_producer_session(  @producer, @session )
   ttresult
end

#jms_msg_result(msg) ⇒ Object

setup_auto_close



42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/stomp_message/stomp_send_topic.rb', line 42

def jms_msg_result(msg)
    #tproducer, tsession= self.jms_create_producer_session(@jms_dest,@jms_conn)
     @my_conn=@jms_conn
   #  tconsumer=jms_create_consumer(tsession,@jms_dest)
 #   tconsumer, consumer_session = self.jms_create_consumer_session(@jms_dest,@jms_conn) 
    #  result =jms_send_ack(tsession,tproducer,msg.to_xml)
     # tsession.commit
     # tconsumer.close
    #  self.jms_close_producer_session(tproducer, tsession)
         result =jms_message_handling(@jms_dest, @jms_conn)  {     jms_send_ack(@session,@producer,msg.to_xml,50) }
     # self.close_con
     msg=nil
   result
end

#open_connectionObject



115
116
117
# File 'lib/stomp_message/stomp_send_topic.rb', line 115

def open_connection
  self.conn = StompMessage::StompSendTopic.open_connection(@conn, self., self.password, self.host, self.port) if self.conn==nil
end

#post_stomp(msg, headers) ⇒ Object

post stomp message to url



135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/stomp_message/stomp_send_topic.rb', line 135

def post_stomp(msg,headers)
       
       response_header = {"Content-type" => "text/xml"}
       response_header.merge headers
       ht =Net::HTTP.start(self.host,self.port)
       url = self.url # + "/" + self.topic
        puts "posting to: #{self.host}: #{self.port} #{url} message: #{msg.to_xml}"
       r=ht.post(url,msg.to_xml,response_header)
      
       puts "result: #{r.to_s}"
       r
end

#send_email_stomp(from, from_alias, to, subject, message) ⇒ Object

send_sms



260
261
262
# File 'lib/stomp_message/stomp_send_topic.rb', line 260

def send_email_stomp(from, from_alias, to, subject, message)
  StompMessage::StompSendTopic.send_email_stomp(from,from_alias,to,subject,message)
end

#send_topic(msg, headers, &r_block) ⇒ Object



167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/stomp_message/stomp_send_topic.rb', line 167

def send_topic(msg, headers, &r_block)
     #  m=StompMessage::Message.new('stomp_BILLING', msg)
        open_connection
        more_headers= {'persistent'=>'false' }
        # i think bug in this merge..needs to return result
        more_headers.merge headers
        if self.java?
          jms_message_handling(@jms_dest, @jms_conn) {  self.jms_send_message(@session,@producer,headers,msg.to_xml)  }
        else
          self.conn.send(self.topic, msg.to_xml, more_headers, &r_block)
        end
     #  Thread.pass
end

#send_topic_ack(msg, headers, timeout = 75, &r_block) ⇒ Object



211
212
213
214
215
216
217
218
219
220
221
222
223
224
# File 'lib/stomp_message/stomp_send_topic.rb', line 211

def send_topic_ack(msg,headers,timeout=75, &r_block)
   
   result=false
   #puts "Send topic ack"
   
   if self.java?
     #create_consumer if @consumer==nil
     result =jms_message_handling(@jms_dest, @jms_conn)  {     jms_send_ack(@session,@producer,msg.to_xml,timeout) }
   else
       result =interim_package(msg,headers,timeout)
    end
         result
      # puts "result is now #{result}"
end

#send_topic_acknowledge(msg, headers, timeout = 60) ⇒ Object

be careful with the receipt topic calculations… strange errors onactive mq



226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
# File 'lib/stomp_message/stomp_send_topic.rb', line 226

def send_topic_acknowledge(msg, headers, timeout=60)
    #m=StompMessage::Message.new('stomp_BILLING', msg)
    open_connection
    s=rand*30   # scott - used to be 1000 but seem to create connections on activemq
    # open new topic to listen to reply...
     # was this but jms seems to blow up  receipt_topic="/topic/receipt/client#{s.to_i}"
    receipt_topic="/topic/rcpt_client#{s.to_i}"
    receipt_flag = false
   # internal_conn =  Stomp::Connection.open '', '', self.host, self.port, false 
    self.conn.subscribe( receipt_topic, { :ack =>"client" })   {|msg|
                  begin
                  Timeout::timeout(timeout) {
                      self.conn.acknowledge(msg,msg.headers)
              	      msg2= msg.body
              	       yield msg2
                   }
                   rescue Exception => e
                    puts "exception #{e.message}"
                 #   raise "timeout" 
                   ensure
                     receipt_flag=true
                     self.conn.unsubscribe receipt_topic 
                   end   
                     }
                    
    
    more_headers= {'persistent'=>'false',  'reply-to' => "#{receipt_topic}" }
    more_headers.merge headers
    self.conn.send(self.topic, msg.to_xml, more_headers )  
    Thread.new {   sleep(timeout+1)
                    puts "calling unsubscribe on #{receipt_topic}" if !receipt_flag
                    self.conn.unsubscribe receipt_topic if !receipt_flag
                     }       
end

#send_topic_jms_shutdownObject

at_exit { puts “#StompMessage::StompSendTopic.selfself.class: auto close exit block” #if @debug

send_topic_jms_shutdown }


65
66
67
# File 'lib/stomp_message/stomp_send_topic.rb', line 65

def send_topic_jms_shutdown
  jms_shutdown(@jms_dest, @jms_conn, @session, @producer, @consumer)
end

#set_up_jms(topic) ⇒ Object



56
57
58
59
60
61
62
63
64
# File 'lib/stomp_message/stomp_send_topic.rb', line 56

def set_up_jms(topic)
   jms_start("TopicConnectionFactory")   #THIS NEEDS TO BE A VAR
   @jms_dest, @jms_conn = jms_create_destination_connection(topic)
   # @session = jms_create_session( topic)
  # @producer = jms_create_producer(@session,@dest)
  #  @consumer = jms_create_consumer(@session,@dest)
  # at_exit { puts "#{self.class}: auto close exit block"  #if @debug
  #         send_topic_jms_shutdown }
end

#setup_auto_closeObject

only call this once



83
84
85
86
87
88
89
90
91
# File 'lib/stomp_message/stomp_send_topic.rb', line 83

def setup_auto_close
  
   at_exit { puts "#{self.class}: auto close exit block"  if @debug
         if !self.java?
             close_topic
            disconnect_stomp 
          end } if !@close_ok
    @close_ok=true
end