Module: StompMessage::JmsTools

Included in:
StompSendTopic, StompServer
Defined in:
lib/stomp_message/jms_tools.rb

Defined Under Namespace

Classes: MyListener

Instance Method Summary collapse

Instance Method Details

#define_source_idObject



39
40
41
42
43
44
# File 'lib/stomp_message/jms_tools.rb', line 39

def define_source_id
  # maybe should user producer id
   @source_id=rand*10000 + rand*400+1 #if @source_id==0
   @source_id=@source_id.to_i.to_s
   puts "--- source id #{@source_id}"  if  @JmsTools_debug
end

#jms_close_producer_session(tprod, tsession) ⇒ Object



257
258
259
260
261
262
263
# File 'lib/stomp_message/jms_tools.rb', line 257

def jms_close_producer_session(tprod,tsession)
     puts "----> in jms close producer session #{Time.now}" if  @JmsTools_debug
   # tsession.commit
    tprod.close if tprod!=nil
    tsession.close if tsession!=nil
     puts "<--- in jms close producer session #{Time.now}" if  @JmsTools_debug
end

#jms_create_consumer(session, dest) ⇒ Object



228
229
230
231
232
233
234
235
236
237
# File 'lib/stomp_message/jms_tools.rb', line 228

def jms_create_consumer(session,dest)
    puts "----> in create consumer" if  @JmsTools_debug
    consumer=session.create_consumer(dest)
    listener=MyListener.new
    listener.set_self(self)
     consumer.setMessageListener(listener)
     @my_conn.start
    puts "<----- returning create consumer" if  @JmsTools_debug
    return  consumer
end

#jms_create_consumer_session(tdest, cconn) ⇒ Object



238
239
240
241
242
243
244
245
246
247
248
249
# File 'lib/stomp_message/jms_tools.rb', line 238

def jms_create_consumer_session(tdest,cconn)
    puts "----> in create consumer session" if  @JmsTools_debug
     ctsession = cconn.create_session(false,Session::AUTO_ACKNOWLEDGE)  # CHEcK AUTo ACK
    consumer=ctsession.create_consumer(tdest)
    listener=MyListener.new
    listener.set_self(self)
     consumer.setMessageListener(listener)
     @my_conn=cconn
     @my_conn.start
    puts "<----- returning create consumer session" if  @JmsTools_debug
    return  consumer, ctsession
end

#jms_create_destination_connection(name) ⇒ Object



264
265
266
267
268
269
270
271
272
# File 'lib/stomp_message/jms_tools.rb', line 264

def jms_create_destination_connection(name)
  puts "----> in create destination connection #{Time.now}" if  @JmsTools_debug
  tdest=@JmsTools_ctx.lookup(name)
  tconn=@JmsTools_conn_factory.create_connection
 # @my_conn=tconn
   puts "----- dest #{tdest.inspect} conn #{tconn.inspect}" if  @JmsTools_debug
  puts "<----- destination connection #{Time.now}" if  @JmsTools_debug
  return tdest, tconn
end

#jms_create_producer(session, dest) ⇒ Object



197
198
199
200
201
202
# File 'lib/stomp_message/jms_tools.rb', line 197

def jms_create_producer(session,dest)
    puts "----> in create producer #{Time.now}" if  @JmsTools_debug
    producer=session.create_producer(dest)
    puts "<----- returning create producer #{Time.now}" if  @JmsTools_debug
    return  producer
end

#jms_create_producer_session(tdest, cconn) ⇒ Object



250
251
252
253
254
255
256
# File 'lib/stomp_message/jms_tools.rb', line 250

def jms_create_producer_session(tdest,cconn)
  puts "----> in create producer session #{Time.now}" if  @JmsTools_debug
       session = cconn.create_session(false,Session::AUTO_ACKNOWLEDGE)  # CHEcK AUTo ACK
       producer= session.create_producer(tdest)
   puts "<----- returning create session #{Time.now}" if  @JmsTools_debug
    return producer, session
end

#jms_create_session(name) ⇒ Object



274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
# File 'lib/stomp_message/jms_tools.rb', line 274

def jms_create_session(name)
  puts "----> in create session #{Time.now}" if  @JmsTools_debug
  dest=@JmsTools_ctx.lookup(name)
  conn=@JmsTools_conn_factory.create_connection
  @my_conn=conn
  # SET CONNECTION PARAMETERS TO OPTIMIZE THROUGHPUT HERE
 #  conn.setStringProperty('imqJMSDeliveryMode','1')
   #  conn.setimqJMSDeliveryMode(jms_persistent)
 session = conn.create_session(false,Session::AUTO_ACKNOWLEDGE)  # CHEcK AUTo ACK
   #   session = conn.create_session(false,Session::DUPS_OK_ACKNOWLEDGE)  # CHEcK AUTo ACK
#  conn.start
   puts "----- dest #{dest.inspect} conn #{conn.inspect}" if  @JmsTools_debug
  puts "<----- returning create session #{Time.now}" if  @JmsTools_debug
  return dest, conn, session
end

#jms_kill_loggingObject



116
117
118
119
120
121
122
123
124
125
# File 'lib/stomp_message/jms_tools.rb', line 116

def jms_kill_logging
  puts "in disabling logging" if  @JmsTools_debug
  enm=java.util.logging.LogManager.log_manager.logger_names
  while enm.hasMoreElements do
    t_logger= java.util.logging.Logger.getLogger(enm.nextElement)
    puts "logger is: #{t_logger.inspect}" if  @JmsTools_debug
    t_logger.level = java.util.logging.Level::OFF if ENV==nil
  end
  enm=nil  # free for GC
end

#jms_manage_headers(headers) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/stomp_message/jms_tools.rb', line 86

def jms_manage_headers(headers)
         headers.each do |k, value|
           key=k.to_s
           if ['correlation-id', 'JMSCorrelationID'].include? key
                headers['JMSCorrelationID']= value.to_s
              elsif ['expires', 'JMSExpiration'].include? key
                headers['JMSExpiration']= value.to_i
              elsif ['persistent', 'JMSDeliveryMode'].include? key
                headers['JMSDeliveryMode'] = value ? 2 : 1
              elsif ['priority', 'JMSPriority'].include? key
                 headers['JMSPriority']= value.to_i
             # elsif ['reply-to', 'JMSReplyTo'].include? key
              #   headers['JMSReplyTo']= nil
              elsif ['type', 'JMSType'].include? key
                 headers['JMSType']= value.to_s
              else #is this the most appropriate thing to do here?
                headers[key] = value.to_s
              end
          end
          # need to turn off persistence, and reply to fix
        # if headers['JMSReplyTo']==nil
            # headers['JMSReplyTo']= @source_id
            # CHECK headers['reply-to']= @source_id
        # end
          headers['JMSOrigin']=unique_source_id.to_s
          puts " --- headers reply #{ headers['JMSReplyTo']} source #{@source_id} id #{headers['JMSMessageID']}"  if  @JmsTools_debug
          puts "--- headers #{headers.to_s}" if  @JmsTools_debug
          headers
  
end

#jms_message_for_me(msg) ⇒ Object



203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/stomp_message/jms_tools.rb', line 203

def jms_message_for_me(msg)
  @my_jms_guard2.synchronize {
  if msg.getStringProperty('JMSReplyTo')==unique_source_id
      puts "message for ME...YAHOO"  if  @JmsTools_debug
      tx_id= msg.getStringProperty('id')
   # puts "id is #{tx_id} size is: #{@response_block_list.size} inspect #{@response_block_list.inspect}"
      block=@response_block_list[tx_id.to_s]
     # puts "calling block for #{tx_id}"  if block!=nil
       block.call(msg) if block!=nil
       @response_block_list.delete(tx_id.to_s) if block!=nil
       puts "NO BLOCK FOUND FOR MY MESSAGE #{tx_id}" if block==nil
   else
    puts "found message for another person---discard"  if  @JmsTools_debug
  end
}
end

#jms_my_idObject



48
49
50
# File 'lib/stomp_message/jms_tools.rb', line 48

def jms_my_id      
       @source_id
end

#jms_next_transaction_idObject



32
33
34
35
36
37
38
# File 'lib/stomp_message/jms_tools.rb', line 32

def jms_next_transaction_id
   puts "--- before trans id #{@transaction_id}"  if  @JmsTools_debug
   @my_jms_guard_tx.synchronize {
         @transaction_id+=1 }
   puts "--- after increment trans id #{@transaction_id}"  if  @JmsTools_debug
   @transaction_id
end

#jms_on_message(msg) ⇒ Object



219
220
221
222
223
224
225
226
227
# File 'lib/stomp_message/jms_tools.rb', line 219

def jms_on_message(msg)
  puts "-----> jms on message" if  @JmsTools_debug
   if msg.getStringProperty('JMSOrigin')!=jms_my_id
     jms_message_for_me(msg)
   else
     puts "received message from myself--- discard"  if  @JmsTools_debug
   end
    puts "<----- jms on message" if  @JmsTools_debug
end

#jms_persistentObject



126
127
128
# File 'lib/stomp_message/jms_tools.rb', line 126

def jms_persistent
  1  #2 is persistent
end

#jms_send_ack(session, producer, tmp_text, ttimeout = 75, rawflag = false) ⇒ Object

use the raw flag to get the whole message back



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/stomp_message/jms_tools.rb', line 130

def jms_send_ack(session,producer,tmp_text,ttimeout=75, rawflag=false)
   # scott new comment  msg=session.create_text_message
    jms_temp_result =''
  @my_jms_guard.synchronize {
   #  jms_next_transaction_id
     jms_tmp_header={}
     jms_temp_result=''
     begin 
     count =0;
     Timeout::timeout(ttimeout) {
           tmp_msg_recv_flag=false
           jms_send_message(session,producer,jms_tmp_header,tmp_text)  { |t2msg|
                      #puts "message recieved: #{t2msg.inspect}" 
                      temp_m=StompMessage::Message.load_xml(t2msg.get_text())
                      jms_temp_result = temp_m.body 
                      jms_temp_result = temp_m if rawflag
                       tmp_msg_recv_flag=true
                       }
                    #  sleep(0.05)
                       while true  
                       #    putc '.'
                           count +=1
                           break if tmp_msg_recv_flag
                           sleeptime=count*0.01
                           count= 5 if count > 100  #don't let count get too big
                           sleep(sleeptime) 
                          # puts "----sleeptime is: #{sleeptime} count: #{count}" 
                           end
      }

      rescue Timeout::Error
        jms_temp_result="timeout waiting for response"
      end
     }
      return jms_temp_result
end

#jms_send_message(session, producer, initial_headers, txt, &block) ⇒ Object



166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/stomp_message/jms_tools.rb', line 166

def jms_send_message(session,producer,initial_headers,txt, &block)    
@my_jms_guard3.synchronize {
  msg=session.create_text_message
  jms_next_transaction_id
  headers={}
  # puts "initail headers #{initial_headers.inspect} reply to: #{initial_headers['JMSReplyTo']}"
  if initial_headers['JMSReplyTo']==nil
     initial_headers['id']=@transaction_id.to_s
  else 
    puts "reply message id is #{initial_headers['id']}" if  @JmsTools_debug
  end
  headers=jms_manage_headers(initial_headers)
  #msg.setIntProperty("id", next_transaction_id)
   if block_given?
      puts "----> adding #{@transaction_id} to my block list msg txt #{txt}"  if  @JmsTools_debug
      headers['JMSReplyTo']= unique_source_id
      @response_block_list[@transaction_id.to_s]=block
    end
  headers['JMSOrigin']= unique_source_id.to_s
  headers.each do |k,v|
    #key=Java::java.lang.String.new(k.to_s)
    msg.setStringProperty(k.to_s,v.to_s)
  end
   msg.setJMSDeliveryMode(jms_persistent)   #2 is persistent
  msg.set_text(txt)
  producer.send(msg)
   msg=nil
  puts "---> jms_send_message sending #{txt}" if  @JmsTools_debug
  puts " ----> jms_send_message" }
  
end

#jms_set_debug(val) ⇒ Object



52
53
54
55
# File 'lib/stomp_message/jms_tools.rb', line 52

def jms_set_debug(val)    
       val=false if val==nil
       @JmsTools_debug = val
end

#jms_shutdown(tdest, tconn, tsession, tproducer, tconsumer = nil) ⇒ Object



290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
# File 'lib/stomp_message/jms_tools.rb', line 290

def jms_shutdown(tdest, tconn, tsession, tproducer, tconsumer=nil)
  puts "----> in shutdown " if  @JmsTools_debug
  #dest.shutdown
 # puts "shutdown #{dest.methods.join(',').to_s}" if  @JmsTools_debug
  puts "conn: #{tconn.inspect} session #{tsession.inspect} producer #{tproducer.inspect}" if  @JmsTools_debug
  @my_conn=nil
  tproducer.close if tproducer != nil
  tconsumer.close if tconsumer != nil

  tsession.close if tsession!=nil
  tconn.stop if tconn!=nil
  tconn.close() if tconn!=nil
   tconn.close if tconn!=nil
   @JmsTools_ctx.close
   tdest=tconn=tsession=tproducer= @JmsTools_ctx=nil
     @response_block_list=nil
       @my_jms_guard = @my_jms_guard2 = @my_jms_guard3    =    @my_jms_guard_tx=nil
     

 # puts "---CLOSED conn: #{tconn.inspect} session #{tsession.inspect} producer #{tproducer.inspect} conn factory: #{@JmsTools_conn_factory}" if  @JmsTools_debug
  puts "<----- returning shutdown " if  @JmsTools_debug

end

#jms_start(factory, src_id = nil) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/stomp_message/jms_tools.rb', line 56

def jms_start(factory,src_id=nil)
  jms_set_debug(false)
  @my_jms_guard    = Mutex.new  
  @my_jms_guard2    = Mutex.new  
    @my_jms_guard3    = Mutex.new 
      @my_jms_guard_tx    = Mutex.new  
 # @source_id=0   # DEFINE SOURCE ID IF MULTIPLE INSTANCES
  @source_id=src_id
  define_source_id if src_id==nil
   puts "-----> JMSTools start #{Time.now}" #if  @JmsTools_debug
  jms_kill_logging if  !@JmsTools_debug
 
  @JmsTools_current_id=0
  properties=Hashtable.new(2)
  #properties.put(Context::PROVIDER_URL, "iiop://127.0.0.1:3700")
  #properties.put(Context::INITIAL_CONTEXT_FACTORY, "com.sun.appserv.naming.S1ASCtxFactory")
  properties.put(Context::PROVIDER_URL, "file:///opt/local/imqobjects")
    properties.put(Context::INITIAL_CONTEXT_FACTORY, "com.sun.jndi.fscontext.RefFSContextFactory")
  
  @JmsTools_ctx=InitialContext.new(properties)
  puts "after initial context #{@JmsTools_ctx.inspect}" if  @JmsTools_debug
  @JmsTools_conn_factory=@JmsTools_ctx.lookup(factory) 
  puts "PROBLEM jms_start context:  #{@JmsTools_ctx.inspect} factory: #{@JmsTools_conn_factory} "  if @JmsTools_ctx==nil || @JmsTools_conn_factory==nil
 
  temp_tx_id=   rand*400+1 # set up random start id
   @transaction_id=temp_tx_id.to_i
  @response_block_list = {}
  puts "<------ ending start #{Time.now}" if  @JmsTools_debug
  
end

#unique_source_idObject



45
46
47
# File 'lib/stomp_message/jms_tools.rb', line 45

def unique_source_id      
     @source_id
end