Class: StompMessage::StompServer
- Inherits:
-
Object
- Object
- StompMessage::StompServer
- Defined in:
- lib/stomp_message/stomp_server.rb
Direct Known Subclasses
Instance Attribute Summary collapse
-
#conn ⇒ Object
Returns the value of attribute conn.
-
#exception_count ⇒ Object
Returns the value of attribute exception_count.
-
#host ⇒ Object
Returns the value of attribute host.
-
#login ⇒ Object
Returns the value of attribute login.
-
#msg_count ⇒ Object
Returns the value of attribute msg_count.
-
#mythreads ⇒ Object
Returns the value of attribute mythreads.
-
#password ⇒ Object
Returns the value of attribute password.
-
#port ⇒ Object
Returns the value of attribute port.
-
#queue ⇒ Object
Returns the value of attribute queue.
-
#thread_count ⇒ Object
Returns the value of attribute thread_count.
-
#topic ⇒ Object
Returns the value of attribute topic.
Instance Method Summary collapse
-
#close_topic ⇒ Object
close the topic, override if necessary.
- #connect_connection ⇒ Object
-
#connect_topic ⇒ Object
name is message command.
- #disconnect_stomp ⇒ Object
- #handle_message(msg) ⇒ Object
-
#initialize(options = {}) ⇒ StompServer
constructor
A new instance of StompServer.
- #method_missing(name, *args) ⇒ Object
-
#monitor_queue_status ⇒ Object
monitor queue.
-
#run ⇒ Object
puts “sms text is: #smssms.text dest is: #smssms.destination result is: #res”.
- #send_reply(headers, msg) ⇒ Object
- #setup_auto_close ⇒ Object
- #stomp_DEBUG(msg, stomp_msg) ⇒ Object
- #stomp_PING(msg, stomp_msg) ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ StompServer
Returns a new instance of StompServer.
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/stomp_message/stomp_server.rb', line 11 def initialize(={}) @my_hostname = Socket.gethostname self.login = [:login]==nil ? '' : [:login] num = [:thread_count]==nil ? '2' : [:thread_count] self.thread_count=num.to_i self.password = [:password]==nil ? '' : [:password] self.host = [:host]==nil ? 'localhost' : [:host] self.port = [:port]==nil ? '61613' : [:port] self.topic = [:topic]==nil ? '/topic/please_define' : [:topic] puts "#{self.class}: message broker host is: #{host} port is #{port} topic is: #{self.topic} threads #{self.thread_count} my host: #{@my_hostname}" self.msg_count = self.exception_count=0 self.queue= Queue.new self.conn=nil connect_connection # self.conn = Stomp::Client.new self.login, self.password, self.host, self.port, false # self.conn = Stomp::Connection.open self.login, self.password, self.host , self.port, false connect_topic @debug=false puts "finished StompServer initializing" setup_auto_close trap("INT") { puts "#{Time.now}: #{self.class} in interrupt trap\n" #close_topic #disconnect_stomp #setup_auto_close already done in INIT exit(0)} end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(name, *args) ⇒ Object
98 99 100 101 |
# File 'lib/stomp_message/stomp_server.rb', line 98 def method_missing(name, *args) puts "Method missing called: #{name}" puts "Likely invalid message recieved" end |
Instance Attribute Details
#conn ⇒ Object
Returns the value of attribute conn.
7 8 9 |
# File 'lib/stomp_message/stomp_server.rb', line 7 def conn @conn end |
#exception_count ⇒ Object
Returns the value of attribute exception_count.
7 8 9 |
# File 'lib/stomp_message/stomp_server.rb', line 7 def exception_count @exception_count end |
#host ⇒ Object
Returns the value of attribute host.
7 8 9 |
# File 'lib/stomp_message/stomp_server.rb', line 7 def host @host end |
#login ⇒ Object
Returns the value of attribute login.
7 8 9 |
# File 'lib/stomp_message/stomp_server.rb', line 7 def login @login end |
#msg_count ⇒ Object
Returns the value of attribute msg_count.
7 8 9 |
# File 'lib/stomp_message/stomp_server.rb', line 7 def msg_count @msg_count end |
#mythreads ⇒ Object
Returns the value of attribute mythreads.
7 8 9 |
# File 'lib/stomp_message/stomp_server.rb', line 7 def mythreads @mythreads end |
#password ⇒ Object
Returns the value of attribute password.
7 8 9 |
# File 'lib/stomp_message/stomp_server.rb', line 7 def password @password end |
#port ⇒ Object
Returns the value of attribute port.
7 8 9 |
# File 'lib/stomp_message/stomp_server.rb', line 7 def port @port end |
#queue ⇒ Object
Returns the value of attribute queue.
7 8 9 |
# File 'lib/stomp_message/stomp_server.rb', line 7 def queue @queue end |
#thread_count ⇒ Object
Returns the value of attribute thread_count.
7 8 9 |
# File 'lib/stomp_message/stomp_server.rb', line 7 def thread_count @thread_count end |
#topic ⇒ Object
Returns the value of attribute topic.
7 8 9 |
# File 'lib/stomp_message/stomp_server.rb', line 7 def topic @topic end |
Instance Method Details
#close_topic ⇒ Object
close the topic, override if necessary
85 86 87 88 |
# File 'lib/stomp_message/stomp_server.rb', line 85 def close_topic puts "#{self.class} #{@my_hostname} unsubscribing #{self.topic}" self.conn.unsubscribe self.topic if self.conn!=nil end |
#connect_connection ⇒ Object
37 38 39 |
# File 'lib/stomp_message/stomp_server.rb', line 37 def connect_connection self.conn = StompMessage::StompSendTopic.open_connection(self.conn, self.login, self.password, self.host, self.port) end |
#connect_topic ⇒ Object
name is message command
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/stomp_message/stomp_server.rb', line 66 def connect_topic # scott old ack auot # see http://activemq.apache.org/stomp.html for activemq.dispathAsync settig self.conn.subscribe( self.topic, { :ack =>"client" , 'activemq.dispatchAsync' => 'false'}) { |msg| self.msg_count+=1 begin self.conn.acknowledge(msg,msg.headers) self.queue << msg rescue Exception => e self.exception_count+=1 puts " Thread: :exception found #{e.backtrace}" puts "Thread: :exception messag #{e.}" end } end |
#disconnect_stomp ⇒ Object
47 48 49 50 |
# File 'lib/stomp_message/stomp_server.rb', line 47 def disconnect_stomp puts "#{self.class} #{@my_hostname} closing connection" self.conn.close() if self.conn !=nil end |
#handle_message(msg) ⇒ Object
118 119 120 121 122 123 124 125 |
# File 'lib/stomp_message/stomp_server.rb', line 118 def (msg) puts "STOMP message frame is : #{msg} " if @debug m=StompMessage::Message.load_xml(msg.body) # puts "Message is: #{m.to_xml}" if @debug puts "Message type is #{m.command}" if @debug send(m.command, m, msg) # effectively case statement (Should SCREEN HERE) # puts "sms text is: #{sms.text} dest is: #{sms.destination} result is: #{res}" end |
#monitor_queue_status ⇒ Object
monitor queue
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/stomp_message/stomp_server.rb', line 103 def monitor_queue_status puts "starting up queue status" self.mythreads << Thread.new { while true sleep(30) puts "QUEUE size is: #{self.queue.size}" # puts "-------conn var is on #{self.conn.inspect}" if !self.conn.open? puts "restarting connection" self.conn=nil connect_connection connect_topic end end # while } end |
#run ⇒ Object
puts “sms text is: #StompMessage::StompServer.smssms.text dest is: #StompMessage::StompServer.smssms.destination result is: #res”
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/stomp_message/stomp_server.rb', line 126 def run self.mythreads = [] 1.upto(self.thread_count) { |c| # create the threads here puts "creating thread: #{c}" self.mythreads << Thread.new(c) { |ctmp| while true begin msg=self.queue.pop (msg) rescue Exception => e self.exception_count+=1 puts " Thread: #{ctmp} :exception found #{e.backtrace}" puts "Thread: #{ctmp} :exception messag #{e.}" result = "-----------EXCEPTION FOUND----------\n" result << "ALIVE Class: #{self.class.to_s} listing to: #{self.topic} on host #{@my_hostname} msg_count #{self.msg_count} exception_count #{self.exception_count}\n" result << "-----exception data--------\n" result << " Thread: #{ctmp} :exception found #{e.backtrace}\n" result << "Thread: #{ctmp} :exception messag #{e.}" begin StompMessage::StompSendTopic.send_email_stomp("[email protected]", "STOMP EXCEPTION", "[email protected]","Thread: #{ctmp} :exception messag #{e.}", result) rescue Exception => e puts "Can not send email, please check smtp host setting" end end # Thread.pass end } } monitor_queue_status self.mythreads.each { |t| t.join } # msg = self.conn.receive # puts "after receive" # self.msg_count+=1 # begin # handle_message(msg) # rescue Exception => e # self.exception_count+=1 # puts "exception found #{e.backtrace}" # puts "exception messag #{e.message}" # end # end # while # t.join # wait for t to die.. end |
#send_reply(headers, msg) ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/stomp_message/stomp_server.rb', line 51 def send_reply(headers,msg) reply_topic=headers['reply-to'] # puts "headers: #{headers['msisdn']} reply topic #{headers['reply-to']}" # internal_conn = Stomp::Connection.open '', '', self.host, self.port, false # scott old auto # scott not needed self.conn.subscribe(reply_topic, { :ack =>"client" }) {|m| puts "yes boring should not be called: #{m.to_s}" # } # m=StompMessage::Message.new('stomp_REPLY', msg_body) response_header = {'persistent'=>'false'} response_header.merge headers self.conn.send(reply_topic, msg.to_xml, response_header ) # self.conn.unsubscribe reply_topic #testing shoul call this but not certain about it # internal_conn.disconnect end |
#setup_auto_close ⇒ Object
40 41 42 43 44 45 46 |
# File 'lib/stomp_message/stomp_server.rb', line 40 def setup_auto_close at_exit { puts "#{self.class}: in at exit block" close_topic disconnect_stomp # self.mythreads.each {|t| t.raise "auto exit" } } end |
#stomp_DEBUG(msg, stomp_msg) ⇒ Object
94 95 96 97 |
# File 'lib/stomp_message/stomp_server.rb', line 94 def stomp_DEBUG(msg,stomp_msg) @debug=!@debug puts "debug flag is now #{@debug}" end |
#stomp_PING(msg, stomp_msg) ⇒ Object
89 90 91 92 93 |
# File 'lib/stomp_message/stomp_server.rb', line 89 def stomp_PING(msg,stomp_msg) body="ALIVE Class: #{self.class.to_s} listing to: #{self.topic} on host #{@my_hostname} msg_count #{self.msg_count} exception_count #{self.exception_count}" reply_msg = StompMessage::Message.new('stomp_REPLY', body) send_reply(stomp_msg.headers,reply_msg) if stomp_msg.headers['reply-to']!=nil end |