Class: Thrift::AMQPServer

Inherits:
BaseServer
  • Object
show all
Defined in:
lib/thrift/amqp/server.rb

Instance Method Summary collapse

Constructor Details

#initialize(processor, iprot_factory, oprot_factory = nil, opts = {}) ⇒ AMQPServer

Returns a new instance of AMQPServer.



10
11
12
13
14
15
16
17
18
19
20
# File 'lib/thrift/amqp/server.rb', line 10

def initialize(processor, iprot_factory, oprot_factory = nil, opts = {})
  @processor = processor
  @iprot_factory = iprot_factory
  @oprot_factory = oprot_factory || iprot_factory

  @queue_name = opts[:queue_name]
  @amqp_uri = opts[:amqp_uri]
  @routing_key = opts[:routing_key]
  @exchange_name = opts[:exchange_name]
  @prefetch = opts[:prefetch]
end

Instance Method Details

#handle(delivery_info, properties, payload) ⇒ Object



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/thrift/amqp/server.rb', line 22

def handle(delivery_info, properties, payload)
  input = StringIO.new payload
  out = StringIO.new
  transport = IOStreamTransport.new input, out
  protocol = @iprot_factory.get_protocol transport

  begin
    @processor.process protocol, protocol

    if out.length > 0
      out.rewind
      @channel.default_exchange.publish(
        out.read(out.length),
        routing_key: properties.reply_to
      )
    end
  rescue => e
    LOGGER.error("Processor failure #{e}")
  end

  @channel.acknowledge(delivery_info.delivery_tag, false)
end

#serveObject



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/thrift/amqp/server.rb', line 45

def serve
  @conn = Bunny.new(@amqp_uri)

  @conn.start
  @channel = @conn.create_channel

  exchange = @channel.direct(@exchange_name)
  queue = @channel.queue(@queue_name)
  queue.bind exchange, routing_key: @routing_key

  @channel.prefetch @prefetch

  loop do
    LOGGER.info("Fetching message from #{@queue_name}")
    queue.subscribe(
      manual_ack: true,
      block: true
    ) do |delivery_info, properties, payload|
      handle delivery_info, properties, payload
    end
  end
rescue Bunny::TCPConnectionFailedForAllHosts, Bunny::ConnectionClosedError
  LOGGER.error("Can't establish the connection")
  sleep 5

  retry
end