Class: StompMessage::StompServer

Inherits:
Object
  • Object
show all
Includes:
JmsTools
Defined in:
lib/stomp_message/stomp_server.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from JmsTools

#define_source_id, #jms_close_producer_session, #jms_create_consumer, #jms_create_consumer_session, #jms_create_destination_connection, #jms_create_producer, #jms_create_producer_session, #jms_create_session, #jms_kill_logging, #jms_manage_headers, #jms_message_for_me, #jms_my_id, #jms_next_transaction_id, #jms_on_message, #jms_persistent, #jms_send_ack, #jms_send_message, #jms_set_debug, #jms_shutdown, #jms_start, #unique_source_id

Constructor Details

#initialize(options = {}) ⇒ StompServer

Returns a new instance of StompServer.



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/stomp_message/stomp_server.rb', line 44

def initialize(options={})
    self.mythreads = []
    self.ss_start_time=Time.now
    puts "VERSION StompServer starting:: #{self.class} #{self.version_number}"
    if RUBY_PLATFORM =~ /java/
       @javaflag= @java_flag=true
    else
      @javaflag= @java_flag=false
    end
   @my_hostname = Socket.gethostname
   self.guard    = Mutex.new  
   self.variables = []
   #  use like  self.guard.synchronize {   code for synchronize here  }
     self. =   options[:login]==nil ? '' : options[:login]
       self.jms_source =   options[:jms_source]==nil ? nil : options[:jms_source]
     num =   options[:thread_count]==nil ? '1' : options[:thread_count]
     self.thread_count=num.to_i
       self.password =   options[:password]==nil ? '' : options[:password]
   self.host =   options[:host]==nil ? 'localhost' : options[:host]
   self.port =   options[:port]==nil ? '61613' : options[:port]
   self.topic =   options[:topic]==nil ? '/topic/please_define' : options[:topic]
   
   self.msg_count = self.exception_count=0
 
   
   if !self.java?
      self.queue= Queue.new
      self.conn=nil
     
      connect_connection
  #  self.conn = Stomp::Client.new self.login, self.password, self.host, self.port, false 
 # self.conn = Stomp::Connection.open self.login, self.password, self.host , self.port, false 
      connect_topic
      setup_auto_close
   else 
     do_jms_setup
   end
  @debug=false
  puts "#{self.class}: message broker host is: #{host} port is #{port} topic is: #{self.topic} threads #{self.thread_count} my host: #{@my_hostname} jms source #{self.jms_source} java flag #{@javaflag}"
  puts "VERSION StompServer finalized:: #{self.class} #{self.version_number}"
  
   trap("INT")   {  puts "#{Time.now}: #{self.class} in interrupt trap\n"
                     #close_topic
                     #disconnect_stomp
                     #setup_auto_close   already done in INIT
                     jms_close_connections
                     exit! }
    
  
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(name, *args) ⇒ Object



257
258
259
260
261
# File 'lib/stomp_message/stomp_server.rb', line 257

def method_missing(name, *args)
  puts "Method missing called: #{name}"
  puts "Likely invalid message recieved"
     [false, ""]
end

Instance Attribute Details

#connObject

include MonitorMixin



40
41
42
# File 'lib/stomp_message/stomp_server.rb', line 40

def conn
  @conn
end

#exception_countObject

include MonitorMixin



40
41
42
# File 'lib/stomp_message/stomp_server.rb', line 40

def exception_count
  @exception_count
end

#guardObject

include MonitorMixin



40
41
42
# File 'lib/stomp_message/stomp_server.rb', line 40

def guard
  @guard
end

#hostObject

include MonitorMixin



40
41
42
# File 'lib/stomp_message/stomp_server.rb', line 40

def host
  @host
end

#jms_sourceObject

include MonitorMixin



40
41
42
# File 'lib/stomp_message/stomp_server.rb', line 40

def jms_source
  @jms_source
end

#loginObject

include MonitorMixin



40
41
42
# File 'lib/stomp_message/stomp_server.rb', line 40

def 
  @login
end

#msg_countObject

include MonitorMixin



40
41
42
# File 'lib/stomp_message/stomp_server.rb', line 40

def msg_count
  @msg_count
end

#mythreadsObject

include MonitorMixin



40
41
42
# File 'lib/stomp_message/stomp_server.rb', line 40

def mythreads
  @mythreads
end

#passwordObject

include MonitorMixin



40
41
42
# File 'lib/stomp_message/stomp_server.rb', line 40

def password
  @password
end

#portObject

include MonitorMixin



40
41
42
# File 'lib/stomp_message/stomp_server.rb', line 40

def port
  @port
end

#queueObject

include MonitorMixin



40
41
42
# File 'lib/stomp_message/stomp_server.rb', line 40

def queue
  @queue
end

#ss_start_timeObject

include MonitorMixin



40
41
42
# File 'lib/stomp_message/stomp_server.rb', line 40

def ss_start_time
  @ss_start_time
end

#thread_countObject

include MonitorMixin



40
41
42
# File 'lib/stomp_message/stomp_server.rb', line 40

def thread_count
  @thread_count
end

#topicObject

include MonitorMixin



40
41
42
# File 'lib/stomp_message/stomp_server.rb', line 40

def topic
  @topic
end

#variablesObject

include MonitorMixin



40
41
42
# File 'lib/stomp_message/stomp_server.rb', line 40

def variables
  @variables
end

Instance Method Details

#check_origin(thash) ⇒ Object



295
296
297
298
299
300
301
# File 'lib/stomp_message/stomp_server.rb', line 295

def check_origin(thash)
  flag= false
   src_id = thash['JMSOrigin']
   flag= src_id.to_s!=self.jms_source if src_id!=nil
   #puts "ORIGIN FLAG #{flag}"
   flag
end

#check_sessionObject

exit!



159
160
161
162
# File 'lib/stomp_message/stomp_server.rb', line 159

def check_session  #NOT USED
 
  @ss_producer,  @ss_session = jms_create_producer_session(@ss_jms_dest, @ss_jms_conn) if @ss_producer==nil || @ss_session==nil
end

#check_threadObject



236
237
238
239
240
241
242
# File 'lib/stomp_message/stomp_server.rb', line 236

def check_thread
   if @debug  
      puts "----check thread id: #{get_id} Thread value: #{Thread.current.inspect}  "
      self.variables[get_id].each { |key, val| 
          puts "---- key #{key} is: #{val.inspect}"}
   end
end

#close_topicObject

close the topic, override if necessary



225
226
227
228
# File 'lib/stomp_message/stomp_server.rb', line 225

def close_topic
   puts "#{self.class} #{@my_hostname} unsubscribing #{self.topic}"
    self.conn.unsubscribe self.topic  if self.conn!=nil
end

#connect_connectionObject



130
131
132
# File 'lib/stomp_message/stomp_server.rb', line 130

def connect_connection
  self.conn = StompMessage::StompSendTopic.open_connection(self.conn, self., self.password, self.host, self.port) 
end

#connect_topicObject

name is message command



206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/stomp_message/stomp_server.rb', line 206

def connect_topic
  # scott old ack auot
  # see http://activemq.apache.org/stomp.html for activemq.dispathAsync settig
    self.conn.subscribe( self.topic, { :ack =>"client" , 'activemq.dispatchAsync' => 'false'}) { |msg|  
       
       self.msg_count+=1
       begin	
        self.conn.acknowledge(msg,msg.headers)   
        self.queue << msg  
         
        rescue Exception => e
          self.exception_count+=1
        puts " Thread: :exception found #{e.backtrace}" 
        puts "Thread: :exception messag #{e.message}"
       end
     
      }
end

#create_dest_connObject



101
102
103
104
105
106
107
# File 'lib/stomp_message/stomp_server.rb', line 101

def create_dest_conn
  #  puts "IN CREATE DEST CONN"
    @ss_jms_dest, @ss_jms_conn = jms_create_destination_connection(self.topic)  if @ss_jms_conn==nil
  #  @ss_jms_conn.start if  @ss_jms_conn!=nil
  #  puts " STARTED CONNECTION"
 #   puts "JMS create dest_conn :dest #{@ss_jms_dest.inspect} conn: #{@ss_jms_conn.inspect}"
end

#create_jms_mdb_connectionsObject

puts “JMS create dest_conn :dest #StompMessage::StompServer.@ss_jms_dest@ss_jms_dest.inspect conn: #StompMessage::StompServer.@ss_jms_conn@ss_jms_conn.inspect”



108
109
110
111
112
113
# File 'lib/stomp_message/stomp_server.rb', line 108

def create_jms_mdb_connections
   puts "----> entering jms mdb connections"
   jms_start("TopicConnectionFactory",self.jms_source) #if @JmsTools_conn_factory==nil
   create_dest_conn
   puts "<---- leaving jms mdb connections" # if @debug
end

#disconnect_stompObject



170
171
172
173
# File 'lib/stomp_message/stomp_server.rb', line 170

def disconnect_stomp
      puts "#{self.class} #{@my_hostname} closing connection"
      self.conn.close() if self.conn !=nil   
end

#do_jms_setupObject



114
115
116
117
118
119
120
121
# File 'lib/stomp_message/stomp_server.rb', line 114

def do_jms_setup
    puts "----> entering jms setup" #if @debug
    setup_thread_specific_items("#{self.jms_source} thread")
    create_jms_mdb_connections
#    jms_auto_close
    check_thread
     puts "<---- leaving jms setup" # if @debug
end

#get_idObject

define thread specific variables here eg Thread.currect=blah blah



347
348
349
350
351
# File 'lib/stomp_message/stomp_server.rb', line 347

def get_id
    id = Thread.current[:id] 
    id = 1 if @java_flag
    id
end

#handle_exception(e) ⇒ Object



280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
# File 'lib/stomp_message/stomp_server.rb', line 280

def handle_exception(e)
  puts " Thread: #{Thread.current[:name]} :exception found #{e.backtrace}" 
  puts "Thread: #{Thread.current[:name]} :exception messag #{e.message}"
  result = "-----------EXCEPTION FOUND----------\n"
  result << "ALIVE Class: #{self.class.to_s} listing to: #{self.topic} on host #{@my_hostname} msg_count #{self.msg_count} exception_count #{self.exception_count}\n"
  result << "-----exception data--------\n"
  result << " Thread: #{Thread.current[:name]} :exception found #{e.backtrace}\n" 
  result << "Thread: #{Thread.current[:name]} :exception messag #{e.message} e: #{e.inspect}"
    begin
      StompMessage::StompSendTopic.send_email_stomp("[email protected]",
              "STOMP EXCEPTION", "[email protected]","Thread: #{Thread.current[:name]} :exception messag #{e.message}", result)
    rescue Exception => e
       puts "Can not send email, please check smtp host setting"
    end
end

#handle_message(msg) ⇒ Object

puts “sms text is: #StompMessage::StompServer.smssms.text dest is: #StompMessage::StompServer.smssms.destination result is: #res”



336
337
338
339
340
341
342
343
344
# File 'lib/stomp_message/stomp_server.rb', line 336

def handle_message(msg)
   puts "STOMP message frame is : #{msg} "      if @debug
   m=StompMessage::Message.load_xml(msg.body)
  # puts "Message is: #{m.to_xml}"  if @debug
   puts "Message type is #{m.command}" if @debug
   r=send(m.command, m, msg)
    send_reply(msg.headers,r[1]) if r[0]=true
  # puts "sms text is: #{sms.text} dest is: #{sms.destination} result is: #{res}"
end

#java?Boolean

Returns:

  • (Boolean)


94
95
96
# File 'lib/stomp_message/stomp_server.rb', line 94

def java?
  @javaflag
end

#jms_auto_closeObject



163
164
165
166
167
168
169
# File 'lib/stomp_message/stomp_server.rb', line 163

def jms_auto_close
  at_exit { puts "#{self.class}: in at exit block"
  # PERHAPS SET GLOBALBS for dest/conn
           jms_close_connections
           # self.mythreads.each {|t| t.raise "auto exit" }
          }
end

#jms_close_connectionsObject



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/stomp_message/stomp_server.rb', line 143

def jms_close_connections
   puts "----in jms close connections #{self.topic}--HERE BE DRAGONS"
  # puts "----#{@ss_jms_dest.inspect} #{@ss_jms_conn.inspect} #{@ss_session.inspect} #{@ss_producer.inspect}" 
    @ss_session.close if @ss_session!=nil
    @ss_producer.close  if @ss_producer!=nil
    @ss_jms_conn.close  if @ss_jms_conn!=nil
      @ss_consumer.close  if @ss_consumer!=nil
      #     @ss_session=@ss_producer=@ss_jms_conn=nil
    server_shutdown
    object_duration = Time.now - self.ss_start_time
    puts "---- Duration: #{object_duration} #{self.class} #{self.version_number} on topic #{self.topic}"
     #   puts "----closed session and producer #{@ss_jms_dest.inspect} #{@ss_jms_conn.inspect} #{@ss_session.inspect} #{@ss_producer.inspect}"
     object_duration=nil
     puts "----after jms shutdown #{self.topic}--- exiting"
     #exit!
end

#jms_message_handling(tjms_dest, tjms_conn) ⇒ Object



176
177
178
179
180
181
182
183
# File 'lib/stomp_message/stomp_server.rb', line 176

def jms_message_handling(tjms_dest, tjms_conn)
    #puts "JMS MESSAGE HNDLE #{tjms_dest.inspect} #{tjms_conn.inspect}"
        @ss_producer, @ss_session  = self.jms_create_producer_session(tjms_dest, tjms_conn)
   #     puts "JMS MESSAGE HNDLE sprod #{@ss_producer.inspect} sess #{@ss_session.inspect}"
        result = yield
        jms_close_producer_session(  @ss_producer, @ss_session )
        result
end

#monitor_queue_statusObject

monitor queue



263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
# File 'lib/stomp_message/stomp_server.rb', line 263

def monitor_queue_status
  puts "starting up queue status"
  self.mythreads << Thread.new { while true
               begin
                 sleep(1000)
                 puts "QUEUE size is: #{self.queue.size}"
                # puts "-------conn var is on #{self.conn.inspect}"
                  if !self.conn.open? 
                    puts "restarting connection"
                    self.reconnect
                  end
                rescue Exception => e
                  handle_exception(e)
                end
               end  # while
                }
end

#onMessage(msg_body, msg_hash) ⇒ Object



302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
# File 'lib/stomp_message/stomp_server.rb', line 302

def onMessage(msg_body,msg_hash)
   puts "message body is : #{msg_body} hash #{msg_hash}"      if @debug
   puts "my id is #{jms_my_id} message id is #{msg_hash['JMSOrigin']}"   if @debug
    start_time=Time.now
   if check_origin(msg_hash)
   begin
      self.msg_count+=1
      check_thread    if @debug
      m=StompMessage::Message.load_xml(msg_body)
      puts "----> #{Time.now} received msg is #{m.command}" # if @debug
      msg=StompMessage::JmsMessage.new(msg_body,msg_hash)
      puts "jms message: #{msg.inspect}" if @debug
    
      r=send(m.command, m, msg)
       duration=Time.now-start_time
       puts "--- Message processing before reply: #{self.topic}: #{m.command} : #{duration}"
      send_reply(msg.headers,r[1])    if r[0]==true
      duration=Time.now-start_time
    
      puts "--- Message processed #{self.topic}: #{m.command} : #{duration}"
        m=nil
    rescue Exception => e
       self.exception_count+=1
       handle_exception(e)
    end
   else
     puts "----> received message from myself...ignoring #{msg_hash['JMSOrigin']}"
   end   #JMS Origin if
   msg_body=msg_hash=nil
   puts "<-----finished onMesage"
  # puts "Message is: #{m.to_xml}"  if @debug
    # effectively case statement  (Should SCREEN HERE)
  # puts "sms text is: #{sms.text} dest is: #{sms.destination} result is: #{res}"
end

#reconnectObject



122
123
124
125
126
127
128
129
# File 'lib/stomp_message/stomp_server.rb', line 122

def reconnect
    puts "about to reconnect"
    self.conn.close if self.conn!=nil
    connect_connection
   #  self.conn = Stomp::Client.new self.login, self.password, self.host, self.port, false 
  # self.conn = Stomp::Connection.open self.login, self.password, self.host , self.port, false 
   connect_topic
end

#runObject



363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
# File 'lib/stomp_message/stomp_server.rb', line 363

def run
     1.upto(self.thread_count)   { |c|  # create the threads here
       #puts "creating thread: #{c}"
       self.mythreads << Thread.new(c)   { |ctmp| 
          setup_thread_specific_items(ctmp)
          while true
           
             begin	
     	        msg=self.queue.pop
               handle_message(msg)   
              
              rescue Exception => e
                self.exception_count+=1
                handle_exception(e)
             end
        #   Thread.pass
          end
         }
       }
      monitor_queue_status
      self.mythreads.each { |t| t.join }
     
     
    #  msg = self.conn.receive
   	#       puts "after receive"
    #  self.msg_count+=1
    #  begin	
   #     handle_message(msg)      
    #   rescue Exception => e
   #      self.exception_count+=1
    #   puts "exception found #{e.backtrace}" 
   #    puts "exception messag #{e.message}"
    #  end
    # end  # while

  # t.join  # wait for t to die..
end

#send_reply(headers, msg) ⇒ Object



184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/stomp_message/stomp_server.rb', line 184

def send_reply(headers,msg)
     # if headers['reply-to']!=nil ||  headers['JMSReplyTo']!=nil
            reply_topic=headers['reply-to']
           # create_dest_conn
            response_header = {'persistent'=>'false'}
            response_header.merge headers
            response_header['id']= headers['id']
            response_header['JMSReplyTo']= headers['JMSReplyTo']
        #    puts "SEND REPLY setting response to #{response_header['JMSReplyTo']} for id #{response_header['id']}  "
         puts "SEND REPLY #{response_header.inspect}" if @debug
            puts "<----- reply to follow:"
            if  self.java?
                jms_message_handling(@ss_jms_dest, @ss_jms_conn) {                 jms_send_message(@ss_session,@ss_producer,headers,msg.to_xml)  }
            
            else
              self.conn.send(reply_topic, msg.to_xml, response_header ) 
             end
           
    #   end

end

#server_shutdownObject



140
141
142
# File 'lib/stomp_message/stomp_server.rb', line 140

def server_shutdown
  puts "----in server shutdown"
end

#setup_auto_closeObject



133
134
135
136
137
138
139
# File 'lib/stomp_message/stomp_server.rb', line 133

def setup_auto_close
  at_exit { puts "#{self.class}: in at exit block"
            close_topic
            disconnect_stomp 
           # self.mythreads.each {|t| t.raise "auto exit" }
          }
end

#setup_thread_specific_items(mythread_number) ⇒ Object



352
353
354
355
356
357
358
359
360
361
362
# File 'lib/stomp_message/stomp_server.rb', line 352

def setup_thread_specific_items(mythread_number)
   puts "----> entering setup threads #{mythread_number}" # if @debug
  Thread.current[:name]= "Thread: #{mythread_number}"
  Thread.current[:id]=mythread_number
  self.variables[get_id]={}
  
  #self.variables[get_id][:test_field]='test problem'
  check_thread
  puts "<---- leaving setup threads" if @debug
  
end

#stomp_DEBUG(msg, stomp_msg) ⇒ Object



251
252
253
254
255
256
# File 'lib/stomp_message/stomp_server.rb', line 251

def stomp_DEBUG(msg,stomp_msg)
  
   @debug=!@debug
   puts "debug flag is now #{@debug}"
    [false, ""]
end

#stomp_PING(msg, stomp_msg) ⇒ Object



229
230
231
232
233
234
235
# File 'lib/stomp_message/stomp_server.rb', line 229

def stomp_PING(msg,stomp_msg)
  body="ALIVE Class: #{self.class.to_s} id: #{self.object_id} listening to: #{self.topic} on host #{@my_hostname} id  thread #{Thread.current.inspect}  msg_count #{self.msg_count} exception_count #{self.exception_count} time: #{Time.now}\n"
    body << "Object details  #{self.inspect}"  if @debug
   reply_msg = StompMessage::Message.new('stomp_REPLY', body)
    [true, reply_msg]
     
end

#stomp_RECONNECT(msg, stomp_msg) ⇒ Object



243
244
245
246
247
248
249
250
# File 'lib/stomp_message/stomp_server.rb', line 243

def stomp_RECONNECT(msg,stomp_msg)
  reconnect

  body="ALIVE Class: #{self.class.to_s} listing to: #{self.topic} on host #{@my_hostname} msg_count #{self.msg_count} exception_count #{self.exception_count} id #{self.inspect} thread #{Thread.current.inspect} connection status #{self.conn.open?}"
   reply_msg = StompMessage::Message.new('stomp_REPLY', body)
   [true, reply_msg]
  
end

#version_numberObject



97
98
99
100
# File 'lib/stomp_message/stomp_server.rb', line 97

def version_number
   module_name=self.class.to_s.split('::')[0] 
   version_num = eval("#{module_name}::VERSION::STRING")  #note needs version/string
end