Class: Thrift::AMQPClientTransport
- Inherits:
-
BaseTransport
- Object
- BaseTransport
- Thrift::AMQPClientTransport
- Defined in:
- lib/thrift/amqp/client.rb
Instance Method Summary collapse
- #close ⇒ Object
- #flush ⇒ Object
-
#initialize(amqp_uri, exchange_name, routing_key) ⇒ AMQPClientTransport
constructor
A new instance of AMQPClientTransport.
- #open ⇒ Object
- #open? ⇒ Boolean
- #read(sz) ⇒ Object
- #write(buf) ⇒ Object
Constructor Details
#initialize(amqp_uri, exchange_name, routing_key) ⇒ AMQPClientTransport
Returns a new instance of AMQPClientTransport.
10 11 12 13 14 15 16 17 |
# File 'lib/thrift/amqp/client.rb', line 10 def initialize(amqp_uri, exchange_name, routing_key) @outbuf = Bytes.empty_byte_buffer @inbuf = StringIO.new @conn = Bunny.new(amqp_uri) @queue = Queue.new @exchange_name, @routing_key = exchange_name, routing_key end |
Instance Method Details
#close ⇒ Object
36 37 38 39 40 41 |
# File 'lib/thrift/amqp/client.rb', line 36 def close if open? @reply_queue.delete @channel.close end end |
#flush ⇒ Object
56 57 58 59 60 61 62 63 64 65 |
# File 'lib/thrift/amqp/client.rb', line 56 def flush @service_exchange.publish( @outbuf, routing_key: @routing_key, correlation_id: self.generate_uuid, reply_to: @reply_queue.name ) @outbuf = Bytes.empty_byte_buffer end |
#open ⇒ Object
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/thrift/amqp/client.rb', line 19 def open unless @channel @conn.start @channel = @conn.create_channel @service_exchange = @channel.exchange(@exchange_name) @reply_queue = @channel.queue("", auto_delete: true, exclusive: true) @reply_queue.subscribe(block: false, manual_ack: true) do |delivery_info, properties, payload| @inbuf.write payload @inbuf.rewind @queue << true @channel.acknowledge(delivery_info.delivery_tag, false) end end end |
#open? ⇒ Boolean
43 44 45 |
# File 'lib/thrift/amqp/client.rb', line 43 def open? @channel && @channel.open? end |
#read(sz) ⇒ Object
47 48 49 50 |
# File 'lib/thrift/amqp/client.rb', line 47 def read(sz) @queue.pop if @inbuf.eof? @inbuf.read(sz) end |
#write(buf) ⇒ Object
52 53 54 |
# File 'lib/thrift/amqp/client.rb', line 52 def write(buf) @outbuf << Bytes.force_binary_encoding(buf) end |