Class: Async::Bus::Protocol::Transaction
- Inherits:
-
Object
- Object
- Async::Bus::Protocol::Transaction
- Defined in:
- lib/async/bus/protocol/transaction.rb
Overview
Represents a transaction for a remote procedure call.
Instance Attribute Summary collapse
-
#accept(object, arguments, options, block_given) ⇒ Object
readonly
Accept a remote procedure invocation.
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#received ⇒ Object
readonly
Returns the value of attribute received.
- #The queue of received messages.(queueofreceivedmessages.) ⇒ Object readonly
- #The transaction ID.(transactionID.) ⇒ Object readonly
-
#timeout ⇒ Object
Returns the value of attribute timeout.
Instance Method Summary collapse
-
#close ⇒ Object
Close the transaction and clean up resources.
-
#initialize(connection, id, timeout: nil) ⇒ Transaction
constructor
Initialize a new transaction.
-
#invoke(name, arguments, options, &block) ⇒ Object
Invoke a remote procedure.
-
#push(message) ⇒ Object
Push a message to the transaction’s received queue.
-
#read ⇒ Object
Read a message from the transaction queue.
- #The accept handler.=(accepthandler. = (value)) ⇒ Object
- #The connection for this transaction.=(connection) ⇒ Object
- #The timeout for the transaction.=(timeout) ⇒ Object
-
#write(message) ⇒ Object
Write a message to the connection.
Constructor Details
#initialize(connection, id, timeout: nil) ⇒ Transaction
Initialize a new transaction.
17 18 19 20 21 22 23 24 25 |
# File 'lib/async/bus/protocol/transaction.rb', line 17 def initialize(connection, id, timeout: nil) @connection = connection @id = id @timeout = timeout @received = Thread::Queue.new @accept = nil end |
Instance Attribute Details
#accept(object, arguments, options, block_given) ⇒ Object (readonly)
Accept a remote procedure invocation.
40 41 42 |
# File 'lib/async/bus/protocol/transaction.rb', line 40 def accept @accept end |
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
28 29 30 |
# File 'lib/async/bus/protocol/transaction.rb', line 28 def connection @connection end |
#id ⇒ Object (readonly)
Returns the value of attribute id.
31 32 33 |
# File 'lib/async/bus/protocol/transaction.rb', line 31 def id @id end |
#received ⇒ Object (readonly)
Returns the value of attribute received.
37 38 39 |
# File 'lib/async/bus/protocol/transaction.rb', line 37 def received @received end |
#The queue of received messages.(queueofreceivedmessages.) ⇒ Object (readonly)
37 |
# File 'lib/async/bus/protocol/transaction.rb', line 37 attr :received |
#The transaction ID.(transactionID.) ⇒ Object (readonly)
31 |
# File 'lib/async/bus/protocol/transaction.rb', line 31 attr :id |
#timeout ⇒ Object
Returns the value of attribute timeout.
34 35 36 |
# File 'lib/async/bus/protocol/transaction.rb', line 34 def timeout @timeout end |
Instance Method Details
#close ⇒ Object
Close the transaction and clean up resources.
73 74 75 76 77 78 79 80 |
# File 'lib/async/bus/protocol/transaction.rb', line 73 def close if connection = @connection @connection = nil @received.close connection.transactions.delete(@id) end end |
#invoke(name, arguments, options, &block) ⇒ Object
Invoke a remote procedure.
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/async/bus/protocol/transaction.rb', line 88 def invoke(name, arguments, , &block) Console.debug(self){[name, arguments, , block]} self.write(Invoke.new(@id, name, arguments, , block_given?)) while response = self.read case response when Return return response.result when Yield begin result = yield(*response.result) self.write(Next.new(@id, result)) rescue => error self.write(Error.new(@id, error)) end when Error raise(response.result) when Throw # Re-throw the tag and value that was thrown on the server side # Throw.result contains [tag, value] array tag, value = response.result throw(tag, value) end end end |
#push(message) ⇒ Object
Push a message to the transaction’s received queue. Silently ignores messages if the queue is already closed.
66 67 68 69 70 |
# File 'lib/async/bus/protocol/transaction.rb', line 66 def push() @received.push() rescue ClosedQueueError # Queue is closed (transaction already finished/closed) - ignore silently. end |
#read ⇒ Object
Read a message from the transaction queue.
44 45 46 47 48 49 50 |
# File 'lib/async/bus/protocol/transaction.rb', line 44 def read if @received.empty? @connection.flush end @received.pop(timeout: @timeout) end |
#The accept handler.=(accepthandler. = (value)) ⇒ Object
40 |
# File 'lib/async/bus/protocol/transaction.rb', line 40 attr :accept |
#The connection for this transaction.=(connection) ⇒ Object
28 |
# File 'lib/async/bus/protocol/transaction.rb', line 28 attr :connection |
#The timeout for the transaction.=(timeout) ⇒ Object
34 |
# File 'lib/async/bus/protocol/transaction.rb', line 34 attr_accessor :timeout |
#write(message) ⇒ Object
Write a message to the connection.
55 56 57 58 59 60 61 |
# File 'lib/async/bus/protocol/transaction.rb', line 55 def write() if @connection @connection.write() else raise RuntimeError, "Transaction is closed!" end end |