Class: EstormMessageProcessor::Base

Inherits:
Object
  • Object
show all
Defined in:
lib/estorm-message-processor/base.rb

Constant Summary collapse

@@mt_id =

MT id counter.

0

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#channelObject (readonly)

Returns the value of attribute channel.



7
8
9
# File 'lib/estorm-message-processor/base.rb', line 7

def channel
  @channel
end

#connObject (readonly)

Returns the value of attribute conn.



7
8
9
# File 'lib/estorm-message-processor/base.rb', line 7

def conn
  @conn
end

#consumerObject (readonly)

Returns the value of attribute consumer.



7
8
9
# File 'lib/estorm-message-processor/base.rb', line 7

def consumer
  @consumer
end

#queueObject (readonly)

Returns the value of attribute queue.



7
8
9
# File 'lib/estorm-message-processor/base.rb', line 7

def queue
  @queue
end

Class Method Details

.loggerObject



10
11
12
# File 'lib/estorm-message-processor/base.rb', line 10

def Base.logger
  @@logger
end

.logger=(logger) ⇒ Object



14
15
16
# File 'lib/estorm-message-processor/base.rb', line 14

def Base.logger=(logger)
  @@logger = logger
end

Instance Method Details

#loggerObject



18
19
20
# File 'lib/estorm-message-processor/base.rb', line 18

def logger
  @@logger
end

#queue_mgmt(config) ⇒ Object



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/estorm-message-processor/base.rb', line 51

def queue_mgmt(config)
   msg= "[*] Waiting for messages in #{@queue.name}.  blocking is #{config[:blocking]}"
   logger.info msg
   count=0
 #  @channel.prefetch(1)   # set quality of service to only delivery one message at a time....
   msg_count,consumer_count = @consumer.queue_statistics  # just to get the stats before entering hte queue
#  @queue.subscribe(:block => config[:blocking]) do |delivery_info, properties, body|
  @consumer.on_delivery() do |delivery_info, , payload|
     @consumer.process_messages(delivery_info,,payload)
#     @consumer.channel.acknowledge(delivery_info.delivery_tag, false) if @consumer.channel!=nil && @consumer.channel.open?
     msg=   "ON DELIVERY: #{@consumer.count}: messages processed"
     logger.info msg
      
        # ack the message to get the next message
     #msg_count,consumer_count = @consumer.queue_statistics  # POSSIBLE RACE CONDITION
    # @consumer.cancel if msg_count==0 && config[:exit_when_empty]
    end
end

#setup_bunny_communications(url, flag, queuename) ⇒ Object



28
29
30
31
32
33
34
35
36
# File 'lib/estorm-message-processor/base.rb', line 28

def setup_bunny_communications(url,flag,queuename)
  @client=EstormMessageProcessor::Client.new
  @conn,@channel=@client.setup_bunny(url,flag)
  raise "connection problem with #{@client.inspect}" if @conn==nil
  @channel   = @conn.create_channel
  @queue   = @channel.queue(queuename)
  msg= "set up active MQ on #{queuename}"
  logger.info msg
end

#start(config) ⇒ Object



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/estorm-message-processor/base.rb', line 71

def start(config)
  msg= "Connecting to bunny environment #{config.inspect}"
  logger.info msg
  setup_bunny_communications(config[:url],config[:connecturlflag],config[:queuename])
  #@consumer=EstormMessageProcessor::Consumer.new(@channel, @queue, config[:consumer_name], true, false, config)
  @consumer=EstormMessageProcessor::Consumer.new(@channel, @queue)
  
  @consumer.logger=logger
  raise "consumer creation problem" if @consumer==nil
  msg_count,consumer_count =@consumer.queue_statistics
  queue_mgmt(config)  
  # the block flag shuts down the thread. the timeout values says whether to unsubscriber
  #need to set ack to true to manage the qos parameter
 # retval= @queue.subscribe_with(@consumer,:ack => true, :block => config[:blocking], :timeout => config[:timeout])
 #  retval= @queue.subscribe_with(@consumer,:ack => true, :block => config[:blocking])
 retval= @queue.subscribe_with(@consumer, :block => config[:blocking])
 # loop do   
      #should loop forever if blocking... otherwise needs  a loop
  #   sleep 1
 #  end
  msg= "Ending======about to tear_down_bunny [retval: #{retval}]...."
  logger.info msg
  tear_down_bunny
    
end

#tear_down_bunnyObject



38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/estorm-message-processor/base.rb', line 38

def tear_down_bunny
   if @conn!=nil && @conn.open? && @channel!=nil && @channel.open?
     sleep 1
     @consumer.cancel if @consumer!=nil && !@consumer.cancelled?
     sleep 1
   #  @queue.unsubscribe
   #  @conn.close if @channel != nil && @channel.open?
   #  sleep 0.5
   end
   msg= "closing bunny"
   logger.info msg
end