Class: EstormMessageProcessor::Base
- Inherits:
-
Object
- Object
- EstormMessageProcessor::Base
- Defined in:
- lib/estorm-message-processor/base.rb
Constant Summary collapse
- @@mt_id =
MT id counter.
0
Instance Attribute Summary collapse
-
#channel ⇒ Object
readonly
Returns the value of attribute channel.
-
#conn ⇒ Object
readonly
Returns the value of attribute conn.
-
#consumer ⇒ Object
readonly
Returns the value of attribute consumer.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
Class Method Summary collapse
Instance Method Summary collapse
- #logger ⇒ Object
- #queue_mgmt(config) ⇒ Object
- #setup_bunny_communications(url, flag, queuename) ⇒ Object
- #start(config) ⇒ Object
- #tear_down_bunny ⇒ Object
Instance Attribute Details
#channel ⇒ Object (readonly)
Returns the value of attribute channel.
7 8 9 |
# File 'lib/estorm-message-processor/base.rb', line 7 def channel @channel end |
#conn ⇒ Object (readonly)
Returns the value of attribute conn.
7 8 9 |
# File 'lib/estorm-message-processor/base.rb', line 7 def conn @conn end |
#consumer ⇒ Object (readonly)
Returns the value of attribute consumer.
7 8 9 |
# File 'lib/estorm-message-processor/base.rb', line 7 def consumer @consumer end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
7 8 9 |
# File 'lib/estorm-message-processor/base.rb', line 7 def queue @queue end |
Class Method Details
.logger ⇒ Object
10 11 12 |
# File 'lib/estorm-message-processor/base.rb', line 10 def Base.logger @@logger end |
.logger=(logger) ⇒ Object
14 15 16 |
# File 'lib/estorm-message-processor/base.rb', line 14 def Base.logger=(logger) @@logger = logger end |
Instance Method Details
#logger ⇒ Object
18 19 20 |
# File 'lib/estorm-message-processor/base.rb', line 18 def logger @@logger end |
#queue_mgmt(config) ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/estorm-message-processor/base.rb', line 51 def queue_mgmt(config) msg= "[*] Waiting for messages in #{@queue.name}. blocking is #{config[:blocking]}" logger.info msg count=0 # @channel.prefetch(1) # set quality of service to only delivery one message at a time.... msg_count,consumer_count = @consumer.queue_statistics # just to get the stats before entering hte queue # @queue.subscribe(:block => config[:blocking]) do |delivery_info, properties, body| @consumer.on_delivery() do |delivery_info, , payload| @consumer.(delivery_info,,payload) # @consumer.channel.acknowledge(delivery_info.delivery_tag, false) if @consumer.channel!=nil && @consumer.channel.open? msg= "ON DELIVERY: #{@consumer.count}: messages processed" logger.info msg # ack the message to get the next message #msg_count,consumer_count = @consumer.queue_statistics # POSSIBLE RACE CONDITION # @consumer.cancel if msg_count==0 && config[:exit_when_empty] end end |
#setup_bunny_communications(url, flag, queuename) ⇒ Object
28 29 30 31 32 33 34 35 36 |
# File 'lib/estorm-message-processor/base.rb', line 28 def setup_bunny_communications(url,flag,queuename) @client=EstormMessageProcessor::Client.new @conn,@channel=@client.setup_bunny(url,flag) raise "connection problem with #{@client.inspect}" if @conn==nil @channel = @conn.create_channel @queue = @channel.queue(queuename) msg= "set up active MQ on #{queuename}" logger.info msg end |
#start(config) ⇒ Object
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/estorm-message-processor/base.rb', line 71 def start(config) msg= "Connecting to bunny environment #{config.inspect}" logger.info msg setup_bunny_communications(config[:url],config[:connecturlflag],config[:queuename]) #@consumer=EstormMessageProcessor::Consumer.new(@channel, @queue, config[:consumer_name], true, false, config) @consumer=EstormMessageProcessor::Consumer.new(@channel, @queue) @consumer.logger=logger raise "consumer creation problem" if @consumer==nil msg_count,consumer_count =@consumer.queue_statistics queue_mgmt(config) # the block flag shuts down the thread. the timeout values says whether to unsubscriber #need to set ack to true to manage the qos parameter # retval= @queue.subscribe_with(@consumer,:ack => true, :block => config[:blocking], :timeout => config[:timeout]) # retval= @queue.subscribe_with(@consumer,:ack => true, :block => config[:blocking]) retval= @queue.subscribe_with(@consumer, :block => config[:blocking]) # loop do #should loop forever if blocking... otherwise needs a loop # sleep 1 # end msg= "Ending======about to tear_down_bunny [retval: #{retval}]...." logger.info msg tear_down_bunny end |
#tear_down_bunny ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/estorm-message-processor/base.rb', line 38 def tear_down_bunny if @conn!=nil && @conn.open? && @channel!=nil && @channel.open? sleep 1 @consumer.cancel if @consumer!=nil && !@consumer.cancelled? sleep 1 # @queue.unsubscribe # @conn.close if @channel != nil && @channel.open? # sleep 0.5 end msg= "closing bunny" logger.info msg end |