Class: Thrift::AMQPClientTransport

Inherits:
BaseTransport
  • Object
show all
Defined in:
lib/thrift/amqp/client.rb

Instance Method Summary collapse

Constructor Details

#initialize(amqp_uri, exchange_name, routing_key) ⇒ AMQPClientTransport

Returns a new instance of AMQPClientTransport.



10
11
12
13
14
15
16
17
# File 'lib/thrift/amqp/client.rb', line 10

def initialize(amqp_uri, exchange_name, routing_key)
  @outbuf = Bytes.empty_byte_buffer
  @inbuf = StringIO.new
  @conn = Bunny.new(amqp_uri)
  @queue = Queue.new

  @exchange_name, @routing_key = exchange_name, routing_key
end

Instance Method Details

#closeObject



36
37
38
39
40
41
# File 'lib/thrift/amqp/client.rb', line 36

def close
  if open?
    @reply_queue.delete
    @channel.close
  end
end

#flushObject



56
57
58
59
60
61
62
63
64
65
# File 'lib/thrift/amqp/client.rb', line 56

def flush
  @service_exchange.publish(
    @outbuf,
    routing_key: @routing_key,
    correlation_id: self.generate_uuid,
    reply_to: @reply_queue.name
  )

  @outbuf = Bytes.empty_byte_buffer
end

#openObject



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/thrift/amqp/client.rb', line 19

def open
  unless @channel
    @conn.start
    @channel = @conn.create_channel
    @service_exchange = @channel.exchange(@exchange_name)
    @reply_queue = @channel.queue("", auto_delete: true, exclusive: true)

    @reply_queue.subscribe(block: false, manual_ack: true) do |delivery_info, properties, payload|
      @inbuf.write payload
      @inbuf.rewind
      @queue << true
      @channel.acknowledge(delivery_info.delivery_tag, false)
    end
  end
end

#open?Boolean

Returns:

  • (Boolean)


43
44
45
# File 'lib/thrift/amqp/client.rb', line 43

def open?
  @channel && @channel.open?
end

#read(sz) ⇒ Object



47
48
49
50
# File 'lib/thrift/amqp/client.rb', line 47

def read(sz)
  @queue.pop if @inbuf.eof?
  @inbuf.read(sz)
end

#write(buf) ⇒ Object



52
53
54
# File 'lib/thrift/amqp/client.rb', line 52

def write(buf)
  @outbuf << Bytes.force_binary_encoding(buf)
end