Class: Thrift::AMQPServer
- Inherits:
-
BaseServer
- Object
- BaseServer
- Thrift::AMQPServer
- Defined in:
- lib/thrift/amqp/server.rb
Instance Method Summary collapse
-
#initialize(processor, iprot_factory, oprot_factory = nil, opts = {}) ⇒ AMQPServer
constructor
A new instance of AMQPServer.
- #serve ⇒ Object
Constructor Details
#initialize(processor, iprot_factory, oprot_factory = nil, opts = {}) ⇒ AMQPServer
6 7 8 9 10 11 12 13 14 15 16 17 |
# File 'lib/thrift/amqp/server.rb', line 6 def initialize(processor, iprot_factory, oprot_factory=nil, opts={}) @processor = processor @iprot_factory = iprot_factory @oprot_factory = oprot_factory || iprot_factory @queue_name = opts[:queue_name] @amqp_uri = opts[:amqp_uri] @routing_key = opts[:routing_key] @exchange_name = opts[:exchange_name] @prefetch = opts[:prefetch] end |
Instance Method Details
#serve ⇒ Object
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/thrift/amqp/server.rb', line 19 def serve @conn = Bunny.new(amqp_uri) @conn.start @channel = @conn.create_channel exchange = @channel.direct(@exchange_name) queue = @channel.queue(queue_name) queue.bind exchange @channel.prefetch @prefetch queue.subscribe(block: false) do |delivery_info, properties, payload| trans = MemoryBufferTransport.new(payload) iprot = @iprot_factory.get_protocol(trans) @processor.process(iprot, nil) @channel.acknowledge(delivery_info.delivery_tag, false) end end |