Class: StompMessage::StompServer
- Inherits:
-
Object
- Object
- StompMessage::StompServer
- Includes:
- JmsTools
- Defined in:
- lib/stomp_message/stomp_server.rb
Direct Known Subclasses
Instance Attribute Summary collapse
-
#conn ⇒ Object
include MonitorMixin.
-
#exception_count ⇒ Object
include MonitorMixin.
-
#guard ⇒ Object
include MonitorMixin.
-
#host ⇒ Object
include MonitorMixin.
-
#jms_source ⇒ Object
include MonitorMixin.
-
#login ⇒ Object
include MonitorMixin.
-
#msg_count ⇒ Object
include MonitorMixin.
-
#mythreads ⇒ Object
include MonitorMixin.
-
#password ⇒ Object
include MonitorMixin.
-
#port ⇒ Object
include MonitorMixin.
-
#queue ⇒ Object
include MonitorMixin.
-
#ss_start_time ⇒ Object
include MonitorMixin.
-
#thread_count ⇒ Object
include MonitorMixin.
-
#topic ⇒ Object
include MonitorMixin.
-
#variables ⇒ Object
include MonitorMixin.
Instance Method Summary collapse
- #check_origin(thash) ⇒ Object
-
#check_session ⇒ Object
exit!.
- #check_thread ⇒ Object
-
#close_topic ⇒ Object
close the topic, override if necessary.
- #connect_connection ⇒ Object
-
#connect_topic ⇒ Object
name is message command.
- #create_dest_conn ⇒ Object
-
#create_jms_mdb_connections ⇒ Object
puts “JMS create dest_conn :dest #@ss_jms_dest@ss_jms_dest.inspect conn: #@ss_jms_conn@ss_jms_conn.inspect”.
- #disconnect_stomp ⇒ Object
- #do_jms_setup ⇒ Object
-
#get_id ⇒ Object
define thread specific variables here eg Thread.currect=blah blah.
- #handle_exception(e) ⇒ Object
-
#handle_message(msg) ⇒ Object
puts “sms text is: #smssms.text dest is: #smssms.destination result is: #res”.
-
#initialize(options = {}) ⇒ StompServer
constructor
A new instance of StompServer.
- #java? ⇒ Boolean
- #jms_auto_close ⇒ Object
- #jms_close_connections ⇒ Object
- #jms_message_handling(tjms_dest, tjms_conn) ⇒ Object
- #method_missing(name, *args) ⇒ Object
-
#monitor_queue_status ⇒ Object
monitor queue.
- #onMessage(msg_body, msg_hash) ⇒ Object
- #reconnect ⇒ Object
- #run ⇒ Object
- #send_reply(headers, msg) ⇒ Object
- #server_shutdown ⇒ Object
- #setup_auto_close ⇒ Object
- #setup_thread_specific_items(mythread_number) ⇒ Object
- #stomp_DEBUG(msg, stomp_msg) ⇒ Object
- #stomp_PING(msg, stomp_msg) ⇒ Object
- #stomp_RECONNECT(msg, stomp_msg) ⇒ Object
- #version_number ⇒ Object
Methods included from JmsTools
#define_source_id, #jms_close_producer_session, #jms_create_consumer, #jms_create_consumer_session, #jms_create_destination_connection, #jms_create_producer, #jms_create_producer_session, #jms_create_session, #jms_kill_logging, #jms_manage_headers, #jms_message_for_me, #jms_my_id, #jms_next_transaction_id, #jms_on_message, #jms_persistent, #jms_send_ack, #jms_send_message, #jms_set_debug, #jms_shutdown, #jms_start, #unique_source_id
Constructor Details
#initialize(options = {}) ⇒ StompServer
Returns a new instance of StompServer.
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/stomp_message/stomp_server.rb', line 44 def initialize(={}) self.mythreads = [] self.ss_start_time=Time.now puts "VERSION StompServer starting:: #{self.class} #{self.version_number}" if RUBY_PLATFORM =~ /java/ @javaflag= @java_flag=true else @javaflag= @java_flag=false end @my_hostname = Socket.gethostname self.guard = Mutex.new self.variables = [] # use like self.guard.synchronize { code for synchronize here } self.login = [:login]==nil ? '' : [:login] self.jms_source = [:jms_source]==nil ? nil : [:jms_source] num = [:thread_count]==nil ? '1' : [:thread_count] self.thread_count=num.to_i self.password = [:password]==nil ? '' : [:password] self.host = [:host]==nil ? 'localhost' : [:host] self.port = [:port]==nil ? '61613' : [:port] self.topic = [:topic]==nil ? '/topic/please_define' : [:topic] self.msg_count = self.exception_count=0 if !self.java? self.queue= Queue.new self.conn=nil connect_connection # self.conn = Stomp::Client.new self.login, self.password, self.host, self.port, false # self.conn = Stomp::Connection.open self.login, self.password, self.host , self.port, false connect_topic setup_auto_close else do_jms_setup end @debug=false puts "#{self.class}: message broker host is: #{host} port is #{port} topic is: #{self.topic} threads #{self.thread_count} my host: #{@my_hostname} jms source #{self.jms_source} java flag #{@javaflag}" puts "VERSION StompServer finalized:: #{self.class} #{self.version_number}" trap("INT") { puts "#{Time.now}: #{self.class} in interrupt trap\n" #close_topic #disconnect_stomp #setup_auto_close already done in INIT jms_close_connections exit! } end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(name, *args) ⇒ Object
257 258 259 260 261 |
# File 'lib/stomp_message/stomp_server.rb', line 257 def method_missing(name, *args) puts "Method missing called: #{name}" puts "Likely invalid message recieved" [false, ""] end |
Instance Attribute Details
#conn ⇒ Object
include MonitorMixin
40 41 42 |
# File 'lib/stomp_message/stomp_server.rb', line 40 def conn @conn end |
#exception_count ⇒ Object
include MonitorMixin
40 41 42 |
# File 'lib/stomp_message/stomp_server.rb', line 40 def exception_count @exception_count end |
#guard ⇒ Object
include MonitorMixin
40 41 42 |
# File 'lib/stomp_message/stomp_server.rb', line 40 def guard @guard end |
#host ⇒ Object
include MonitorMixin
40 41 42 |
# File 'lib/stomp_message/stomp_server.rb', line 40 def host @host end |
#jms_source ⇒ Object
include MonitorMixin
40 41 42 |
# File 'lib/stomp_message/stomp_server.rb', line 40 def jms_source @jms_source end |
#login ⇒ Object
include MonitorMixin
40 41 42 |
# File 'lib/stomp_message/stomp_server.rb', line 40 def login @login end |
#msg_count ⇒ Object
include MonitorMixin
40 41 42 |
# File 'lib/stomp_message/stomp_server.rb', line 40 def msg_count @msg_count end |
#mythreads ⇒ Object
include MonitorMixin
40 41 42 |
# File 'lib/stomp_message/stomp_server.rb', line 40 def mythreads @mythreads end |
#password ⇒ Object
include MonitorMixin
40 41 42 |
# File 'lib/stomp_message/stomp_server.rb', line 40 def password @password end |
#port ⇒ Object
include MonitorMixin
40 41 42 |
# File 'lib/stomp_message/stomp_server.rb', line 40 def port @port end |
#queue ⇒ Object
include MonitorMixin
40 41 42 |
# File 'lib/stomp_message/stomp_server.rb', line 40 def queue @queue end |
#ss_start_time ⇒ Object
include MonitorMixin
40 41 42 |
# File 'lib/stomp_message/stomp_server.rb', line 40 def ss_start_time @ss_start_time end |
#thread_count ⇒ Object
include MonitorMixin
40 41 42 |
# File 'lib/stomp_message/stomp_server.rb', line 40 def thread_count @thread_count end |
#topic ⇒ Object
include MonitorMixin
40 41 42 |
# File 'lib/stomp_message/stomp_server.rb', line 40 def topic @topic end |
#variables ⇒ Object
include MonitorMixin
40 41 42 |
# File 'lib/stomp_message/stomp_server.rb', line 40 def variables @variables end |
Instance Method Details
#check_origin(thash) ⇒ Object
295 296 297 298 299 300 301 |
# File 'lib/stomp_message/stomp_server.rb', line 295 def check_origin(thash) flag= false src_id = thash['JMSOrigin'] flag= src_id.to_s!=self.jms_source if src_id!=nil #puts "ORIGIN FLAG #{flag}" flag end |
#check_session ⇒ Object
exit!
159 160 161 162 |
# File 'lib/stomp_message/stomp_server.rb', line 159 def check_session #NOT USED @ss_producer, @ss_session = jms_create_producer_session(@ss_jms_dest, @ss_jms_conn) if @ss_producer==nil || @ss_session==nil end |
#check_thread ⇒ Object
236 237 238 239 240 241 242 |
# File 'lib/stomp_message/stomp_server.rb', line 236 def check_thread if @debug puts "----check thread id: #{get_id} Thread value: #{Thread.current.inspect} " self.variables[get_id].each { |key, val| puts "---- key #{key} is: #{val.inspect}"} end end |
#close_topic ⇒ Object
close the topic, override if necessary
225 226 227 228 |
# File 'lib/stomp_message/stomp_server.rb', line 225 def close_topic puts "#{self.class} #{@my_hostname} unsubscribing #{self.topic}" self.conn.unsubscribe self.topic if self.conn!=nil end |
#connect_connection ⇒ Object
130 131 132 |
# File 'lib/stomp_message/stomp_server.rb', line 130 def connect_connection self.conn = StompMessage::StompSendTopic.open_connection(self.conn, self.login, self.password, self.host, self.port) end |
#connect_topic ⇒ Object
name is message command
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 |
# File 'lib/stomp_message/stomp_server.rb', line 206 def connect_topic # scott old ack auot # see http://activemq.apache.org/stomp.html for activemq.dispathAsync settig self.conn.subscribe( self.topic, { :ack =>"client" , 'activemq.dispatchAsync' => 'false'}) { |msg| self.msg_count+=1 begin self.conn.acknowledge(msg,msg.headers) self.queue << msg rescue Exception => e self.exception_count+=1 puts " Thread: :exception found #{e.backtrace}" puts "Thread: :exception messag #{e.}" end } end |
#create_dest_conn ⇒ Object
101 102 103 104 105 106 107 |
# File 'lib/stomp_message/stomp_server.rb', line 101 def create_dest_conn # puts "IN CREATE DEST CONN" @ss_jms_dest, @ss_jms_conn = jms_create_destination_connection(self.topic) if @ss_jms_conn==nil # @ss_jms_conn.start if @ss_jms_conn!=nil # puts " STARTED CONNECTION" # puts "JMS create dest_conn :dest #{@ss_jms_dest.inspect} conn: #{@ss_jms_conn.inspect}" end |
#create_jms_mdb_connections ⇒ Object
puts “JMS create dest_conn :dest #StompMessage::StompServer.@ss_jms_dest@ss_jms_dest.inspect conn: #StompMessage::StompServer.@ss_jms_conn@ss_jms_conn.inspect”
108 109 110 111 112 113 |
# File 'lib/stomp_message/stomp_server.rb', line 108 def create_jms_mdb_connections puts "----> entering jms mdb connections" jms_start("TopicConnectionFactory",self.jms_source) #if @JmsTools_conn_factory==nil create_dest_conn puts "<---- leaving jms mdb connections" # if @debug end |
#disconnect_stomp ⇒ Object
170 171 172 173 |
# File 'lib/stomp_message/stomp_server.rb', line 170 def disconnect_stomp puts "#{self.class} #{@my_hostname} closing connection" self.conn.close() if self.conn !=nil end |
#do_jms_setup ⇒ Object
114 115 116 117 118 119 120 121 |
# File 'lib/stomp_message/stomp_server.rb', line 114 def do_jms_setup puts "----> entering jms setup" #if @debug setup_thread_specific_items("#{self.jms_source} thread") create_jms_mdb_connections # jms_auto_close check_thread puts "<---- leaving jms setup" # if @debug end |
#get_id ⇒ Object
define thread specific variables here eg Thread.currect=blah blah
347 348 349 350 351 |
# File 'lib/stomp_message/stomp_server.rb', line 347 def get_id id = Thread.current[:id] id = 1 if @java_flag id end |
#handle_exception(e) ⇒ Object
280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 |
# File 'lib/stomp_message/stomp_server.rb', line 280 def handle_exception(e) puts " Thread: #{Thread.current[:name]} :exception found #{e.backtrace}" puts "Thread: #{Thread.current[:name]} :exception messag #{e.}" result = "-----------EXCEPTION FOUND----------\n" result << "ALIVE Class: #{self.class.to_s} listing to: #{self.topic} on host #{@my_hostname} msg_count #{self.msg_count} exception_count #{self.exception_count}\n" result << "-----exception data--------\n" result << " Thread: #{Thread.current[:name]} :exception found #{e.backtrace}\n" result << "Thread: #{Thread.current[:name]} :exception messag #{e.} e: #{e.inspect}" begin StompMessage::StompSendTopic.send_email_stomp("[email protected]", "STOMP EXCEPTION", "[email protected]","Thread: #{Thread.current[:name]} :exception messag #{e.}", result) rescue Exception => e puts "Can not send email, please check smtp host setting" end end |
#handle_message(msg) ⇒ Object
puts “sms text is: #StompMessage::StompServer.smssms.text dest is: #StompMessage::StompServer.smssms.destination result is: #res”
336 337 338 339 340 341 342 343 344 |
# File 'lib/stomp_message/stomp_server.rb', line 336 def (msg) puts "STOMP message frame is : #{msg} " if @debug m=StompMessage::Message.load_xml(msg.body) # puts "Message is: #{m.to_xml}" if @debug puts "Message type is #{m.command}" if @debug r=send(m.command, m, msg) send_reply(msg.headers,r[1]) if r[0]=true # puts "sms text is: #{sms.text} dest is: #{sms.destination} result is: #{res}" end |
#java? ⇒ Boolean
94 95 96 |
# File 'lib/stomp_message/stomp_server.rb', line 94 def java? @javaflag end |
#jms_auto_close ⇒ Object
163 164 165 166 167 168 169 |
# File 'lib/stomp_message/stomp_server.rb', line 163 def jms_auto_close at_exit { puts "#{self.class}: in at exit block" # PERHAPS SET GLOBALBS for dest/conn jms_close_connections # self.mythreads.each {|t| t.raise "auto exit" } } end |
#jms_close_connections ⇒ Object
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/stomp_message/stomp_server.rb', line 143 def jms_close_connections puts "----in jms close connections #{self.topic}--HERE BE DRAGONS" # puts "----#{@ss_jms_dest.inspect} #{@ss_jms_conn.inspect} #{@ss_session.inspect} #{@ss_producer.inspect}" @ss_session.close if @ss_session!=nil @ss_producer.close if @ss_producer!=nil @ss_jms_conn.close if @ss_jms_conn!=nil @ss_consumer.close if @ss_consumer!=nil # @ss_session=@ss_producer=@ss_jms_conn=nil server_shutdown object_duration = Time.now - self.ss_start_time puts "---- Duration: #{object_duration} #{self.class} #{self.version_number} on topic #{self.topic}" # puts "----closed session and producer #{@ss_jms_dest.inspect} #{@ss_jms_conn.inspect} #{@ss_session.inspect} #{@ss_producer.inspect}" object_duration=nil puts "----after jms shutdown #{self.topic}--- exiting" #exit! end |
#jms_message_handling(tjms_dest, tjms_conn) ⇒ Object
176 177 178 179 180 181 182 183 |
# File 'lib/stomp_message/stomp_server.rb', line 176 def (tjms_dest, tjms_conn) #puts "JMS MESSAGE HNDLE #{tjms_dest.inspect} #{tjms_conn.inspect}" @ss_producer, @ss_session = self.jms_create_producer_session(tjms_dest, tjms_conn) # puts "JMS MESSAGE HNDLE sprod #{@ss_producer.inspect} sess #{@ss_session.inspect}" result = yield jms_close_producer_session( @ss_producer, @ss_session ) result end |
#monitor_queue_status ⇒ Object
monitor queue
263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 |
# File 'lib/stomp_message/stomp_server.rb', line 263 def monitor_queue_status puts "starting up queue status" self.mythreads << Thread.new { while true begin sleep(1000) puts "QUEUE size is: #{self.queue.size}" # puts "-------conn var is on #{self.conn.inspect}" if !self.conn.open? puts "restarting connection" self.reconnect end rescue Exception => e handle_exception(e) end end # while } end |
#onMessage(msg_body, msg_hash) ⇒ Object
302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 |
# File 'lib/stomp_message/stomp_server.rb', line 302 def onMessage(msg_body,msg_hash) puts "message body is : #{msg_body} hash #{msg_hash}" if @debug puts "my id is #{jms_my_id} message id is #{msg_hash['JMSOrigin']}" if @debug start_time=Time.now if check_origin(msg_hash) begin self.msg_count+=1 check_thread if @debug m=StompMessage::Message.load_xml(msg_body) puts "----> #{Time.now} received msg is #{m.command}" # if @debug msg=StompMessage::JmsMessage.new(msg_body,msg_hash) puts "jms message: #{msg.inspect}" if @debug r=send(m.command, m, msg) duration=Time.now-start_time puts "--- Message processing before reply: #{self.topic}: #{m.command} : #{duration}" send_reply(msg.headers,r[1]) if r[0]==true duration=Time.now-start_time puts "--- Message processed #{self.topic}: #{m.command} : #{duration}" m=nil rescue Exception => e self.exception_count+=1 handle_exception(e) end else puts "----> received message from myself...ignoring #{msg_hash['JMSOrigin']}" end #JMS Origin if msg_body=msg_hash=nil puts "<-----finished onMesage" # puts "Message is: #{m.to_xml}" if @debug # effectively case statement (Should SCREEN HERE) # puts "sms text is: #{sms.text} dest is: #{sms.destination} result is: #{res}" end |
#reconnect ⇒ Object
122 123 124 125 126 127 128 129 |
# File 'lib/stomp_message/stomp_server.rb', line 122 def reconnect puts "about to reconnect" self.conn.close if self.conn!=nil connect_connection # self.conn = Stomp::Client.new self.login, self.password, self.host, self.port, false # self.conn = Stomp::Connection.open self.login, self.password, self.host , self.port, false connect_topic end |
#run ⇒ Object
363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 |
# File 'lib/stomp_message/stomp_server.rb', line 363 def run 1.upto(self.thread_count) { |c| # create the threads here #puts "creating thread: #{c}" self.mythreads << Thread.new(c) { |ctmp| setup_thread_specific_items(ctmp) while true begin msg=self.queue.pop (msg) rescue Exception => e self.exception_count+=1 handle_exception(e) end # Thread.pass end } } monitor_queue_status self.mythreads.each { |t| t.join } # msg = self.conn.receive # puts "after receive" # self.msg_count+=1 # begin # handle_message(msg) # rescue Exception => e # self.exception_count+=1 # puts "exception found #{e.backtrace}" # puts "exception messag #{e.message}" # end # end # while # t.join # wait for t to die.. end |
#send_reply(headers, msg) ⇒ Object
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
# File 'lib/stomp_message/stomp_server.rb', line 184 def send_reply(headers,msg) # if headers['reply-to']!=nil || headers['JMSReplyTo']!=nil reply_topic=headers['reply-to'] # create_dest_conn response_header = {'persistent'=>'false'} response_header.merge headers response_header['id']= headers['id'] response_header['JMSReplyTo']= headers['JMSReplyTo'] # puts "SEND REPLY setting response to #{response_header['JMSReplyTo']} for id #{response_header['id']} " puts "SEND REPLY #{response_header.inspect}" if @debug puts "<----- reply to follow:" if self.java? (@ss_jms_dest, @ss_jms_conn) { (@ss_session,@ss_producer,headers,msg.to_xml) } else self.conn.send(reply_topic, msg.to_xml, response_header ) end # end end |
#server_shutdown ⇒ Object
140 141 142 |
# File 'lib/stomp_message/stomp_server.rb', line 140 def server_shutdown puts "----in server shutdown" end |
#setup_auto_close ⇒ Object
133 134 135 136 137 138 139 |
# File 'lib/stomp_message/stomp_server.rb', line 133 def setup_auto_close at_exit { puts "#{self.class}: in at exit block" close_topic disconnect_stomp # self.mythreads.each {|t| t.raise "auto exit" } } end |
#setup_thread_specific_items(mythread_number) ⇒ Object
352 353 354 355 356 357 358 359 360 361 362 |
# File 'lib/stomp_message/stomp_server.rb', line 352 def setup_thread_specific_items(mythread_number) puts "----> entering setup threads #{mythread_number}" # if @debug Thread.current[:name]= "Thread: #{mythread_number}" Thread.current[:id]=mythread_number self.variables[get_id]={} #self.variables[get_id][:test_field]='test problem' check_thread puts "<---- leaving setup threads" if @debug end |
#stomp_DEBUG(msg, stomp_msg) ⇒ Object
251 252 253 254 255 256 |
# File 'lib/stomp_message/stomp_server.rb', line 251 def stomp_DEBUG(msg,stomp_msg) @debug=!@debug puts "debug flag is now #{@debug}" [false, ""] end |
#stomp_PING(msg, stomp_msg) ⇒ Object
229 230 231 232 233 234 235 |
# File 'lib/stomp_message/stomp_server.rb', line 229 def stomp_PING(msg,stomp_msg) body="ALIVE Class: #{self.class.to_s} id: #{self.object_id} listening to: #{self.topic} on host #{@my_hostname} id thread #{Thread.current.inspect} msg_count #{self.msg_count} exception_count #{self.exception_count} time: #{Time.now}\n" body << "Object details #{self.inspect}" if @debug reply_msg = StompMessage::Message.new('stomp_REPLY', body) [true, reply_msg] end |
#stomp_RECONNECT(msg, stomp_msg) ⇒ Object
243 244 245 246 247 248 249 250 |
# File 'lib/stomp_message/stomp_server.rb', line 243 def stomp_RECONNECT(msg,stomp_msg) reconnect body="ALIVE Class: #{self.class.to_s} listing to: #{self.topic} on host #{@my_hostname} msg_count #{self.msg_count} exception_count #{self.exception_count} id #{self.inspect} thread #{Thread.current.inspect} connection status #{self.conn.open?}" reply_msg = StompMessage::Message.new('stomp_REPLY', body) [true, reply_msg] end |
#version_number ⇒ Object
97 98 99 100 |
# File 'lib/stomp_message/stomp_server.rb', line 97 def version_number module_name=self.class.to_s.split('::')[0] version_num = eval("#{module_name}::VERSION::STRING") #note needs version/string end |