Class: StompMessage::StompSendTopic
- Inherits:
-
Object
- Object
- StompMessage::StompSendTopic
- Includes:
- JmsTools
- Defined in:
- lib/stomp_message/stomp_send_topic.rb
Overview
this class manages sending and receiving messages. It uses passed code block to execute received code
Instance Attribute Summary collapse
-
#conn ⇒ Object
Returns the value of attribute conn.
-
#host ⇒ Object
Returns the value of attribute host.
-
#login ⇒ Object
Returns the value of attribute login.
-
#password ⇒ Object
Returns the value of attribute password.
-
#port ⇒ Object
Returns the value of attribute port.
-
#topic ⇒ Object
Returns the value of attribute topic.
-
#url ⇒ Object
Returns the value of attribute url.
Class Method Summary collapse
-
.open_connection(old_conn, login, pass, host, port) ⇒ Object
manage timeout etc…
- .send_email(from, to, msg) ⇒ Object
- .send_email_stomp(from, from_alias, to, subject, message) ⇒ Object
Instance Method Summary collapse
-
#close_topic ⇒ Object
close the topic.
- #create_consumer ⇒ Object
-
#disconnect_stomp ⇒ Object
disconnect the connection.
- #init_vars(options) ⇒ Object
-
#initialize(options = {}) ⇒ StompSendTopic
constructor
A new instance of StompSendTopic.
-
#interim_package(msg, headers, timeout) ⇒ Object
send_sms.
- #java? ⇒ Boolean
- #jms_message_handling(tjms_dest, tjms_conn) ⇒ Object
- #jms_message_one_way(tjms_dest, tjms_conn) ⇒ Object
-
#jms_msg_result(msg) ⇒ Object
setup_auto_close.
- #open_connection ⇒ Object
-
#post_stomp(msg, headers) ⇒ Object
post stomp message to url.
-
#send_email_stomp(from, from_alias, to, subject, message) ⇒ Object
send_sms.
- #send_topic(msg, headers, &r_block) ⇒ Object
- #send_topic_ack(msg, headers, timeout = 75, &r_block) ⇒ Object
-
#send_topic_acknowledge(msg, headers, timeout = 60) ⇒ Object
be careful with the receipt topic calculations…
-
#send_topic_jms_shutdown ⇒ Object
at_exit { puts “#selfself.class: auto close exit block” #if @debug send_topic_jms_shutdown }.
- #set_up_jms(topic) ⇒ Object
-
#setup_auto_close ⇒ Object
only call this once.
Methods included from JmsTools
#define_source_id, #jms_close_producer_session, #jms_create_consumer, #jms_create_consumer_session, #jms_create_destination_connection, #jms_create_producer, #jms_create_producer_session, #jms_create_session, #jms_kill_logging, #jms_manage_headers, #jms_message_for_me, #jms_my_id, #jms_next_transaction_id, #jms_on_message, #jms_persistent, #jms_send_ack, #jms_send_message, #jms_set_debug, #jms_shutdown, #jms_start, #unique_source_id
Constructor Details
#initialize(options = {}) ⇒ StompSendTopic
Returns a new instance of StompSendTopic.
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/stomp_message/stomp_send_topic.rb', line 20 def initialize(={}) # set up variables using hash @close_ok=false init_vars() if RUBY_PLATFORM =~ /java/ @javaflag= @java_flag=true else @javaflag= @java_flag=false end set_up_jms([:topic]) if @javaflag # puts "host is: #{host} port is #{port}" # using url as flag wrong # self.conn = Stomp::Client.new(self.login, self.password, self.host, self.port, false) if self.url ==nil @debug=false puts "#{self.class}: Initialized host is: #{self.host} port is #{self.port} topic is #{self.topic} login #{self.login} pass: #{self.password}" if @debug # scott old self.conn.subscribe( self.topic, { :ack =>"auto" }) { |m| # self.conn.subscribe( self.topic, { :ack =>"client" }) { |m| # # puts "#{self.class} msg: #{m.to_s}" # } # setup_auto_close end |
Instance Attribute Details
#conn ⇒ Object
Returns the value of attribute conn.
17 18 19 |
# File 'lib/stomp_message/stomp_send_topic.rb', line 17 def conn @conn end |
#host ⇒ Object
Returns the value of attribute host.
17 18 19 |
# File 'lib/stomp_message/stomp_send_topic.rb', line 17 def host @host end |
#login ⇒ Object
Returns the value of attribute login.
17 18 19 |
# File 'lib/stomp_message/stomp_send_topic.rb', line 17 def login @login end |
#password ⇒ Object
Returns the value of attribute password.
17 18 19 |
# File 'lib/stomp_message/stomp_send_topic.rb', line 17 def password @password end |
#port ⇒ Object
Returns the value of attribute port.
17 18 19 |
# File 'lib/stomp_message/stomp_send_topic.rb', line 17 def port @port end |
#topic ⇒ Object
Returns the value of attribute topic.
17 18 19 |
# File 'lib/stomp_message/stomp_send_topic.rb', line 17 def topic @topic end |
#url ⇒ Object
Returns the value of attribute url.
17 18 19 |
# File 'lib/stomp_message/stomp_send_topic.rb', line 17 def url @url end |
Class Method Details
.open_connection(old_conn, login, pass, host, port) ⇒ Object
manage timeout etc…
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/stomp_message/stomp_send_topic.rb', line 93 def self.open_connection(old_conn,login,pass,host,port) conn=old_conn count=0 flag= false begin conn.close if conn!=nil count+=1 Timeout::timeout(15) { conn=nil conn = Stomp::Client.new(login, pass, host, port, false) flag=true } rescue Timeout::Error puts "Timeout error: exception retrying flag is: #{flag} retry number #{count}" retry if !flag and count < 4 # raise "timeout" end raise 'connection not established' if conn==nil conn end |
.send_email(from, to, msg) ⇒ Object
278 279 280 281 282 283 284 285 286 287 288 289 290 291 |
# File 'lib/stomp_message/stomp_send_topic.rb', line 278 def self.send_email(from,to,msg) case Socket.gethostname when "svbalance.cure.com.ph" smtp_host='mail2.cure.com.ph' when "Scotts-Computer.local" smtp_host='mail2.cure.com.ph' else smtp_host='localhost' end Net::SMTP.start(smtp_host) { |smtp| smtp.(msg, from, to) } end |
.send_email_stomp(from, from_alias, to, subject, message) ⇒ Object
263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 |
# File 'lib/stomp_message/stomp_send_topic.rb', line 263 def self.send_email_stomp(from, from_alias, to, subject, ) #to_csv_array=to.each.join(",") msg = <<EOF__RUBY_END_OF_MESSAGE From: #{from_alias} <#{from}> To: #{to} Subject: #{subject} #{Time.now} Comand ---------- #{subject} RESPONSE ----------- #{} EOF__RUBY_END_OF_MESSAGE recipients=to.split(',') recipients.each {|r| StompMessage::StompSendTopic.send_email(from,r,msg)} end |
Instance Method Details
#close_topic ⇒ Object
close the topic
119 120 121 122 |
# File 'lib/stomp_message/stomp_send_topic.rb', line 119 def close_topic self.conn.unsubscribe(self.topic) if self.conn !=nil && !self.java? # self.conn=nil end |
#create_consumer ⇒ Object
208 209 210 |
# File 'lib/stomp_message/stomp_send_topic.rb', line 208 def create_consumer @consumer, @consumer_session = jms_create_consumer_session(@jms_dest,@jms_conn) end |
#disconnect_stomp ⇒ Object
disconnect the connection
124 125 126 127 128 129 130 131 132 133 |
# File 'lib/stomp_message/stomp_send_topic.rb', line 124 def disconnect_stomp if !self.java? close_topic puts "#{self.class} closing connection #{@jms_conn.inspect}" if @debug @jms_conn.close() if @jms_conn !=nil else @jms_conn.close() if @jms_conn !=nil put "INVESTIGATE THIS diconnect stomp if java in stomp_send_topic.rb" end end |
#init_vars(options) ⇒ Object
69 70 71 72 73 74 75 76 77 78 |
# File 'lib/stomp_message/stomp_send_topic.rb', line 69 def init_vars() self.login = [:login]==nil ? '' : [:login] self.url = [:url]== nil ? nil : [:url] self.password = [:password]==nil ? '' : [:password] self.host = [:host]==nil ? 'localhost' : [:host] self.port = [:port]==nil ? '61613' : [:port] self.topic = [:topic]==nil ? '/topic/undefined' : [:topic] self.conn=nil puts "self url is #{self.url}" if @debug end |
#interim_package(msg, headers, timeout) ⇒ Object
send_sms
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
# File 'lib/stomp_message/stomp_send_topic.rb', line 180 def interim_package(msg,headers,timeout) result=false msg_received_flag =false begin Timeout::timeout(timeout+1) { self.send_topic_acknowledge(msg,headers,timeout-1) { |msg| # puts 'in handle action block' # puts "MESSAGE RECEIVED ---- #{msg.to_s} " msg_received_flag=true m=StompMessage::Message.load_xml(msg) result=m.body # result= yield m.body if block_given? FIGURE OUT HOW TO MAKE THIS WORK # puts "result is #{result}" result } while true # putc '.' break if msg_received_flag sleep(1) end } rescue SystemExit rescue Timeout::Error rescue Exception => e puts "exception #{e.} class: #{e.class}" puts "no receipt" end result end |
#java? ⇒ Boolean
79 80 81 |
# File 'lib/stomp_message/stomp_send_topic.rb', line 79 def java? @javaflag end |
#jms_message_handling(tjms_dest, tjms_conn) ⇒ Object
157 158 159 160 161 162 163 164 165 166 |
# File 'lib/stomp_message/stomp_send_topic.rb', line 157 def (tjms_dest,tjms_conn) @my_conn=@jms_conn @producer, @session = self.jms_create_producer_session(tjms_dest,tjms_conn) tconsumer=jms_create_consumer(@session,@jms_dest) #@producer, @session = self.jms_create_producer_session(tjms_dest,tjms_conn) result = yield tconsumer.close jms_close_producer_session( @producer, @session ) result end |
#jms_message_one_way(tjms_dest, tjms_conn) ⇒ Object
147 148 149 150 151 152 153 154 155 156 |
# File 'lib/stomp_message/stomp_send_topic.rb', line 147 def (tjms_dest,tjms_conn) # @my_conn=@jms_conn @producer, @session = self.jms_create_producer_session(tjms_dest,tjms_conn) # tconsumer=jms_create_consumer(@session,@jms_dest) #@producer, @session = self.jms_create_producer_session(tjms_dest,tjms_conn) ttresult = yield # tconsumer.close jms_close_producer_session( @producer, @session ) ttresult end |
#jms_msg_result(msg) ⇒ Object
setup_auto_close
42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/stomp_message/stomp_send_topic.rb', line 42 def jms_msg_result(msg) #tproducer, tsession= self.jms_create_producer_session(@jms_dest,@jms_conn) @my_conn=@jms_conn # tconsumer=jms_create_consumer(tsession,@jms_dest) # tconsumer, consumer_session = self.jms_create_consumer_session(@jms_dest,@jms_conn) # result =jms_send_ack(tsession,tproducer,msg.to_xml) # tsession.commit # tconsumer.close # self.jms_close_producer_session(tproducer, tsession) result =(@jms_dest, @jms_conn) { jms_send_ack(@session,@producer,msg.to_xml,50) } # self.close_con msg=nil result end |
#open_connection ⇒ Object
115 116 117 |
# File 'lib/stomp_message/stomp_send_topic.rb', line 115 def open_connection self.conn = StompMessage::StompSendTopic.open_connection(@conn, self.login, self.password, self.host, self.port) if self.conn==nil end |
#post_stomp(msg, headers) ⇒ Object
post stomp message to url
135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/stomp_message/stomp_send_topic.rb', line 135 def post_stomp(msg,headers) response_header = {"Content-type" => "text/xml"} response_header.merge headers ht =Net::HTTP.start(self.host,self.port) url = self.url # + "/" + self.topic puts "posting to: #{self.host}: #{self.port} #{url} message: #{msg.to_xml}" r=ht.post(url,msg.to_xml,response_header) puts "result: #{r.to_s}" r end |
#send_email_stomp(from, from_alias, to, subject, message) ⇒ Object
send_sms
260 261 262 |
# File 'lib/stomp_message/stomp_send_topic.rb', line 260 def send_email_stomp(from, from_alias, to, subject, ) StompMessage::StompSendTopic.send_email_stomp(from,from_alias,to,subject,) end |
#send_topic(msg, headers, &r_block) ⇒ Object
167 168 169 170 171 172 173 174 175 176 177 178 179 |
# File 'lib/stomp_message/stomp_send_topic.rb', line 167 def send_topic(msg, headers, &r_block) # m=StompMessage::Message.new('stomp_BILLING', msg) open_connection more_headers= {'persistent'=>'false' } # i think bug in this merge..needs to return result more_headers.merge headers if self.java? (@jms_dest, @jms_conn) { self.(@session,@producer,headers,msg.to_xml) } else self.conn.send(self.topic, msg.to_xml, more_headers, &r_block) end # Thread.pass end |
#send_topic_ack(msg, headers, timeout = 75, &r_block) ⇒ Object
211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
# File 'lib/stomp_message/stomp_send_topic.rb', line 211 def send_topic_ack(msg,headers,timeout=75, &r_block) result=false #puts "Send topic ack" if self.java? #create_consumer if @consumer==nil result =(@jms_dest, @jms_conn) { jms_send_ack(@session,@producer,msg.to_xml,timeout) } else result =interim_package(msg,headers,timeout) end result # puts "result is now #{result}" end |
#send_topic_acknowledge(msg, headers, timeout = 60) ⇒ Object
be careful with the receipt topic calculations… strange errors onactive mq
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 |
# File 'lib/stomp_message/stomp_send_topic.rb', line 226 def send_topic_acknowledge(msg, headers, timeout=60) #m=StompMessage::Message.new('stomp_BILLING', msg) open_connection s=rand*30 # scott - used to be 1000 but seem to create connections on activemq # open new topic to listen to reply... # was this but jms seems to blow up receipt_topic="/topic/receipt/client#{s.to_i}" receipt_topic="/topic/rcpt_client#{s.to_i}" receipt_flag = false # internal_conn = Stomp::Connection.open '', '', self.host, self.port, false self.conn.subscribe( receipt_topic, { :ack =>"client" }) {|msg| begin Timeout::timeout(timeout) { self.conn.acknowledge(msg,msg.headers) msg2= msg.body yield msg2 } rescue Exception => e puts "exception #{e.}" # raise "timeout" ensure receipt_flag=true self.conn.unsubscribe receipt_topic end } more_headers= {'persistent'=>'false', 'reply-to' => "#{receipt_topic}" } more_headers.merge headers self.conn.send(self.topic, msg.to_xml, more_headers ) Thread.new { sleep(timeout+1) puts "calling unsubscribe on #{receipt_topic}" if !receipt_flag self.conn.unsubscribe receipt_topic if !receipt_flag } end |
#send_topic_jms_shutdown ⇒ Object
at_exit { puts “#StompMessage::StompSendTopic.selfself.class: auto close exit block” #if @debug
send_topic_jms_shutdown }
65 66 67 |
# File 'lib/stomp_message/stomp_send_topic.rb', line 65 def send_topic_jms_shutdown jms_shutdown(@jms_dest, @jms_conn, @session, @producer, @consumer) end |
#set_up_jms(topic) ⇒ Object
56 57 58 59 60 61 62 63 64 |
# File 'lib/stomp_message/stomp_send_topic.rb', line 56 def set_up_jms(topic) jms_start("TopicConnectionFactory") #THIS NEEDS TO BE A VAR @jms_dest, @jms_conn = jms_create_destination_connection(topic) # @session = jms_create_session( topic) # @producer = jms_create_producer(@session,@dest) # @consumer = jms_create_consumer(@session,@dest) # at_exit { puts "#{self.class}: auto close exit block" #if @debug # send_topic_jms_shutdown } end |
#setup_auto_close ⇒ Object
only call this once
83 84 85 86 87 88 89 90 91 |
# File 'lib/stomp_message/stomp_send_topic.rb', line 83 def setup_auto_close at_exit { puts "#{self.class}: auto close exit block" if @debug if !self.java? close_topic disconnect_stomp end } if !@close_ok @close_ok=true end |