Class: Async::Bus::Protocol::Transaction

Inherits:
Object
  • Object
show all
Defined in:
lib/async/bus/protocol/transaction.rb

Overview

Represents a transaction for a remote procedure call.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection, id, timeout: nil) ⇒ Transaction

Initialize a new transaction.



17
18
19
20
21
22
23
24
25
# File 'lib/async/bus/protocol/transaction.rb', line 17

def initialize(connection, id, timeout: nil)
  @connection = connection
  @id = id
  
  @timeout = timeout
  
  @received = Thread::Queue.new
  @accept = nil
end

Instance Attribute Details

#accept(object, arguments, options, block_given) ⇒ Object (readonly)

Accept a remote procedure invocation.



40
41
42
# File 'lib/async/bus/protocol/transaction.rb', line 40

def accept
  @accept
end

#connectionObject (readonly)

Returns the value of attribute connection.



28
29
30
# File 'lib/async/bus/protocol/transaction.rb', line 28

def connection
  @connection
end

#idObject (readonly)

Returns the value of attribute id.



31
32
33
# File 'lib/async/bus/protocol/transaction.rb', line 31

def id
  @id
end

#receivedObject (readonly)

Returns the value of attribute received.



37
38
39
# File 'lib/async/bus/protocol/transaction.rb', line 37

def received
  @received
end

#The queue of received messages.(queueofreceivedmessages.) ⇒ Object (readonly)



37
# File 'lib/async/bus/protocol/transaction.rb', line 37

attr :received

#The transaction ID.(transactionID.) ⇒ Object (readonly)



31
# File 'lib/async/bus/protocol/transaction.rb', line 31

attr :id

#timeoutObject

Returns the value of attribute timeout.



34
35
36
# File 'lib/async/bus/protocol/transaction.rb', line 34

def timeout
  @timeout
end

Instance Method Details

#closeObject

Close the transaction and clean up resources.



73
74
75
76
77
78
79
80
# File 'lib/async/bus/protocol/transaction.rb', line 73

def close
  if connection = @connection
    @connection = nil
    @received.close
    
    connection.transactions.delete(@id)
  end
end

#invoke(name, arguments, options, &block) ⇒ Object

Invoke a remote procedure.



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/async/bus/protocol/transaction.rb', line 88

def invoke(name, arguments, options, &block)
  Console.debug(self){[name, arguments, options, block]}
  
  self.write(Invoke.new(@id, name, arguments, options, block_given?))
  
  while response = self.read
    case response
    when Return
      return response.result
    when Yield
      begin
        result = yield(*response.result)
        self.write(Next.new(@id, result))
      rescue => error
        self.write(Error.new(@id, error))
      end
    when Error
      raise(response.result)
    when Throw
      # Re-throw the tag and value that was thrown on the server side
      # Throw.result contains [tag, value] array
      tag, value = response.result
      throw(tag, value)
    end
  end
end

#push(message) ⇒ Object

Push a message to the transaction’s received queue. Silently ignores messages if the queue is already closed.



66
67
68
69
70
# File 'lib/async/bus/protocol/transaction.rb', line 66

def push(message)
  @received.push(message)
rescue ClosedQueueError
  # Queue is closed (transaction already finished/closed) - ignore silently.
end

#readObject

Read a message from the transaction queue.



44
45
46
47
48
49
50
# File 'lib/async/bus/protocol/transaction.rb', line 44

def read
  if @received.empty?
    @connection.flush
  end
  
  @received.pop(timeout: @timeout)
end

#The accept handler.=(accepthandler. = (value)) ⇒ Object



40
# File 'lib/async/bus/protocol/transaction.rb', line 40

attr :accept

#The connection for this transaction.=(connection) ⇒ Object



28
# File 'lib/async/bus/protocol/transaction.rb', line 28

attr :connection

#The timeout for the transaction.=(timeout) ⇒ Object



34
# File 'lib/async/bus/protocol/transaction.rb', line 34

attr_accessor :timeout

#write(message) ⇒ Object

Write a message to the connection.



55
56
57
58
59
60
61
# File 'lib/async/bus/protocol/transaction.rb', line 55

def write(message)
  if @connection
    @connection.write(message)
  else
    raise RuntimeError, "Transaction is closed!"
  end
end