Class: Thrift::AMQPServer

Inherits:
BaseServer
  • Object
show all
Defined in:
lib/thrift/amqp/server.rb

Instance Method Summary collapse

Constructor Details

#initialize(processor, iprot_factory, oprot_factory = nil, opts = {}) ⇒ AMQPServer



6
7
8
9
10
11
12
13
14
15
16
17
# File 'lib/thrift/amqp/server.rb', line 6

def initialize(processor, iprot_factory, oprot_factory=nil, opts={})

  @processor = processor
  @iprot_factory = iprot_factory
  @oprot_factory = oprot_factory || iprot_factory

  @queue_name = opts[:queue_name]
  @amqp_uri = opts[:amqp_uri]
  @routing_key = opts[:routing_key]
  @exchange_name = opts[:exchange_name]
  @prefetch = opts[:prefetch]
end

Instance Method Details

#serveObject



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/thrift/amqp/server.rb', line 19

def serve
  @conn = Bunny.new(amqp_uri)

  @conn.start
  @channel = @conn.create_channel

  exchange = @channel.direct(@exchange_name)
  queue = @channel.queue(queue_name)
  queue.bind exchange

  @channel.prefetch @prefetch

  queue.subscribe(block: false) do |delivery_info, properties, payload|
    trans = MemoryBufferTransport.new(payload)
    iprot = @iprot_factory.get_protocol(trans)

    @processor.process(iprot, nil)

    @channel.acknowledge(delivery_info.delivery_tag, false)
  end
end