Class: RServiceBus::MQ_RabbitMq
Overview
Beanstalk client implementation.
Instance Attribute Summary
Attributes inherited from MQ
Instance Method Summary collapse
-
#ack ⇒ Object
“Commit” queue.
-
#connect(host, port) ⇒ Object
Connect to the broker.
-
#pop ⇒ Object
Get next msg from queue.
- #returnToQueue ⇒ Object
-
#send(queueName, msg) ⇒ Object
At least called in the Host rescue block, to ensure all network links are healthy.
-
#subscribe(queuename) ⇒ Object
Connect to the queue.
Methods inherited from MQ
Constructor Details
This class inherits a constructor from RServiceBus::MQ
Instance Method Details
#ack ⇒ Object
“Commit” queue
63 64 65 66 67 68 |
# File 'lib/rservicebus/MQ/RabbitMq.rb', line 63 def ack @ch.ack( @delivery_info.delivery_tag ) @delivery_info = nil @properties = nil @payload = nil end |
#connect(host, port) ⇒ Object
Connect to the broker
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/rservicebus/MQ/RabbitMq.rb', line 12 def connect( host, port ) port ||= 11300 string = "#{host}:#{port}" begin @conn = Bunny.new @conn.start @ch = @conn.create_channel @x = @ch.default_exchange rescue Exception => e puts 'Error connecting to Beanstalk' puts "Host string, #{string}" if e. == 'Beanstalk::NotConnected' then puts '***Most likely, beanstalk is not running. Start beanstalk, and try running this again.' puts "***If you still get this error, check beanstalk is running at, #{string}" else puts e. puts e.backtrace end abort() end end |
#pop ⇒ Object
Get next msg from queue
45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/rservicebus/MQ/RabbitMq.rb', line 45 def pop begin @delivery_info, @properties, @payload = @q.pop(:ack => true) if @delivery_info.nil? then sleep @timeout raise NoMsgToProcess.new end end return @payload end |
#returnToQueue ⇒ Object
58 59 60 |
# File 'lib/rservicebus/MQ/RabbitMq.rb', line 58 def returnToQueue @ch.reject( @delivery_info.delivery_tag, true ) end |
#send(queueName, msg) ⇒ Object
At least called in the Host rescue block, to ensure all network links are healthy
71 72 73 |
# File 'lib/rservicebus/MQ/RabbitMq.rb', line 71 def send( queueName, msg ) @x.publish(msg, :routing_key => queueName) end |
#subscribe(queuename) ⇒ Object
Connect to the queue
40 41 42 |
# File 'lib/rservicebus/MQ/RabbitMq.rb', line 40 def subscribe( queuename ) @q = @ch.queue( queuename, :durable => true, :auto_delete => false) end |