Class: Async::Bus::Protocol::Connection
- Inherits:
-
Object
- Object
- Async::Bus::Protocol::Connection
- Defined in:
- lib/async/bus/protocol/connection.rb
Instance Attribute Summary collapse
-
#objects ⇒ Object
readonly
Returns the value of attribute objects.
-
#packer ⇒ Object
readonly
Returns the value of attribute packer.
-
#proxies ⇒ Object
readonly
Returns the value of attribute proxies.
-
#transactions ⇒ Object
readonly
Returns the value of attribute transactions.
-
#unpacker ⇒ Object
readonly
Returns the value of attribute unpacker.
Class Method Summary collapse
Instance Method Summary collapse
- #[](name) ⇒ Object
- #bind(name, object) ⇒ Object
- #close ⇒ Object
-
#initialize(peer, id) ⇒ Connection
constructor
A new instance of Connection.
- #invoke(name, arguments, options = {}, &block) ⇒ Object
- #next_id ⇒ Object
- #proxy(object) ⇒ Object
- #run ⇒ Object
Constructor Details
#initialize(peer, id) ⇒ Connection
Returns a new instance of Connection.
46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/async/bus/protocol/connection.rb', line 46 def initialize(peer, id) @peer = peer @wrapper = Wrapper.new(self) @unpacker = @wrapper.unpacker(peer) @packer = @wrapper.packer(peer) @transactions = {} @id = id @objects = {} @proxies = ::ObjectSpace::WeakMap.new @finalized = Thread::Queue.new end |
Instance Attribute Details
#objects ⇒ Object (readonly)
Returns the value of attribute objects.
61 62 63 |
# File 'lib/async/bus/protocol/connection.rb', line 61 def objects @objects end |
#packer ⇒ Object (readonly)
Returns the value of attribute packer.
65 66 67 |
# File 'lib/async/bus/protocol/connection.rb', line 65 def packer @packer end |
#proxies ⇒ Object (readonly)
Returns the value of attribute proxies.
62 63 64 |
# File 'lib/async/bus/protocol/connection.rb', line 62 def proxies @proxies end |
#transactions ⇒ Object (readonly)
Returns the value of attribute transactions.
74 75 76 |
# File 'lib/async/bus/protocol/connection.rb', line 74 def transactions @transactions end |
#unpacker ⇒ Object (readonly)
Returns the value of attribute unpacker.
64 65 66 |
# File 'lib/async/bus/protocol/connection.rb', line 64 def unpacker @unpacker end |
Class Method Details
.client(peer) ⇒ Object
38 39 40 |
# File 'lib/async/bus/protocol/connection.rb', line 38 def self.client(peer) self.new(peer, 1) end |
.server(peer) ⇒ Object
42 43 44 |
# File 'lib/async/bus/protocol/connection.rb', line 42 def self.server(peer) self.new(peer, 2) end |
Instance Method Details
#[](name) ⇒ Object
92 93 94 95 96 97 98 99 100 101 |
# File 'lib/async/bus/protocol/connection.rb', line 92 def [](name) unless proxy = @proxies[name] proxy = Proxy.new(self, name) @proxies[name] = proxy ObjectSpace.define_finalizer(proxy, finalize(name)) end return proxy end |
#bind(name, object) ⇒ Object
84 85 86 |
# File 'lib/async/bus/protocol/connection.rb', line 84 def bind(name, object) @objects[name] = object end |
#close ⇒ Object
157 158 159 160 161 162 163 |
# File 'lib/async/bus/protocol/connection.rb', line 157 def close @transactions.each do |id, transaction| transaction.close end @peer.close end |
#invoke(name, arguments, options = {}, &block) ⇒ Object
103 104 105 106 107 108 109 110 111 |
# File 'lib/async/bus/protocol/connection.rb', line 103 def invoke(name, arguments, = {}, &block) id = self.next_id transaction = Transaction.new(self, id) @transactions[id] = transaction transaction.invoke(name, arguments, , &block) end |
#next_id ⇒ Object
67 68 69 70 71 72 |
# File 'lib/async/bus/protocol/connection.rb', line 67 def next_id id = @id @id += 2 return id end |
#proxy(object) ⇒ Object
76 77 78 79 80 81 82 |
# File 'lib/async/bus/protocol/connection.rb', line 76 def proxy(object) name = next_id.to_s(16).freeze bind(name, object) return name end |
#run ⇒ Object
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/async/bus/protocol/connection.rb', line 113 def run finalizer_task = Async do while name = @finalized.pop @packer.write([:release, name]) end end @unpacker.each do || id = .shift if id == :release name = .shift @objects.delete(name) if name.is_a?(String) elsif transaction = @transactions[id] transaction.received.enqueue() elsif .first == :invoke .shift transaction = Transaction.new(self, id) @transactions[id] = transaction name = .shift object = @objects[name] Async do transaction.accept(object, *) ensure transaction.close end else raise "Out of order message: #{}" end end ensure finalizer_task.stop @transactions.each do |id, transaction| transaction.close end @transactions.clear @proxies = ::ObjectSpace::WeakMap.new end |