Class: Thrift::AMQPServer
- Inherits:
-
BaseServer
- Object
- BaseServer
- Thrift::AMQPServer
- Defined in:
- lib/thrift/amqp/server.rb
Instance Method Summary collapse
- #handle(delivery_info, properties, payload) ⇒ Object
-
#initialize(processor, iprot_factory, oprot_factory = nil, opts = {}) ⇒ AMQPServer
constructor
A new instance of AMQPServer.
- #serve ⇒ Object
Constructor Details
#initialize(processor, iprot_factory, oprot_factory = nil, opts = {}) ⇒ AMQPServer
Returns a new instance of AMQPServer.
10 11 12 13 14 15 16 17 18 19 20 |
# File 'lib/thrift/amqp/server.rb', line 10 def initialize(processor, iprot_factory, oprot_factory = nil, opts = {}) @processor = processor @iprot_factory = iprot_factory @oprot_factory = oprot_factory || iprot_factory @queue_name = opts[:queue_name] @amqp_uri = opts[:amqp_uri] @routing_key = opts[:routing_key] @exchange_name = opts[:exchange_name] @prefetch = opts[:prefetch] end |
Instance Method Details
#handle(delivery_info, properties, payload) ⇒ Object
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/thrift/amqp/server.rb', line 22 def handle(delivery_info, properties, payload) input = StringIO.new payload out = StringIO.new transport = IOStreamTransport.new input, out protocol = @iprot_factory.get_protocol transport begin @processor.process protocol, protocol if out.length > 0 out.rewind @channel.default_exchange.publish( out.read(out.length), routing_key: properties.reply_to ) end rescue => e LOGGER.error("Processor failure #{e}") end @channel.acknowledge(delivery_info.delivery_tag, false) end |
#serve ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/thrift/amqp/server.rb', line 45 def serve @conn = Bunny.new(@amqp_uri) @conn.start @channel = @conn.create_channel exchange = @channel.direct(@exchange_name) queue = @channel.queue(@queue_name) queue.bind exchange, routing_key: @routing_key @channel.prefetch @prefetch loop do LOGGER.info("Fetching message from #{@queue_name}") queue.subscribe( manual_ack: true, block: true ) do |delivery_info, properties, payload| handle delivery_info, properties, payload end end rescue Bunny::TCPConnectionFailedForAllHosts, Bunny::ConnectionClosedError LOGGER.error("Can't establish the connection") sleep 5 retry end |