Class: Async::Bus::Protocol::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/async/bus/protocol/connection.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(peer, id) ⇒ Connection

Returns a new instance of Connection.



46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/async/bus/protocol/connection.rb', line 46

def initialize(peer, id)
	@peer = peer
	
	@wrapper = Wrapper.new(self)
	@unpacker = @wrapper.unpacker(peer)
	@packer = @wrapper.packer(peer)
	
	@transactions = {}
	@id = id
	
	@objects = {}
	@proxies = ::ObjectSpace::WeakMap.new
	@finalized = Thread::Queue.new
end

Instance Attribute Details

#objectsObject (readonly)

Returns the value of attribute objects.



61
62
63
# File 'lib/async/bus/protocol/connection.rb', line 61

def objects
  @objects
end

#packerObject (readonly)

Returns the value of attribute packer.



65
66
67
# File 'lib/async/bus/protocol/connection.rb', line 65

def packer
  @packer
end

#proxiesObject (readonly)

Returns the value of attribute proxies.



62
63
64
# File 'lib/async/bus/protocol/connection.rb', line 62

def proxies
  @proxies
end

#transactionsObject (readonly)

Returns the value of attribute transactions.



74
75
76
# File 'lib/async/bus/protocol/connection.rb', line 74

def transactions
  @transactions
end

#unpackerObject (readonly)

Returns the value of attribute unpacker.



64
65
66
# File 'lib/async/bus/protocol/connection.rb', line 64

def unpacker
  @unpacker
end

Class Method Details

.client(peer) ⇒ Object



38
39
40
# File 'lib/async/bus/protocol/connection.rb', line 38

def self.client(peer)
	self.new(peer, 1)
end

.server(peer) ⇒ Object



42
43
44
# File 'lib/async/bus/protocol/connection.rb', line 42

def self.server(peer)
	self.new(peer, 2)
end

Instance Method Details

#[](name) ⇒ Object



92
93
94
95
96
97
98
99
100
101
# File 'lib/async/bus/protocol/connection.rb', line 92

def [](name)
	unless proxy = @proxies[name]
		proxy = Proxy.new(self, name)
		@proxies[name] = proxy
		
		ObjectSpace.define_finalizer(proxy, finalize(name))
	end
	
	return proxy
end

#bind(name, object) ⇒ Object



84
85
86
# File 'lib/async/bus/protocol/connection.rb', line 84

def bind(name, object)
	@objects[name] = object
end

#closeObject



157
158
159
160
161
162
163
# File 'lib/async/bus/protocol/connection.rb', line 157

def close
	@transactions.each do |id, transaction|
		transaction.close
	end
	
	@peer.close
end

#invoke(name, arguments, options = {}, &block) ⇒ Object



103
104
105
106
107
108
109
110
111
# File 'lib/async/bus/protocol/connection.rb', line 103

def invoke(name, arguments, options = {}, &block)
	
	id = self.next_id
	
	transaction = Transaction.new(self, id)
	@transactions[id] = transaction
	
	transaction.invoke(name, arguments, options, &block)
end

#next_idObject



67
68
69
70
71
72
# File 'lib/async/bus/protocol/connection.rb', line 67

def next_id
	id = @id
	@id += 2
	
	return id
end

#proxy(object) ⇒ Object



76
77
78
79
80
81
82
# File 'lib/async/bus/protocol/connection.rb', line 76

def proxy(object)
	name = next_id.to_s(16).freeze
	
	bind(name, object)
	
	return name
end

#runObject



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/async/bus/protocol/connection.rb', line 113

def run
	finalizer_task = Async do
		while name = @finalized.pop
			@packer.write([:release, name])
		end
	end
	
	@unpacker.each do |message|
		id = message.shift
		
		if id == :release
			name = message.shift
			@objects.delete(name) if name.is_a?(String)
		elsif transaction = @transactions[id]
			transaction.received.enqueue(message)
		elsif message.first == :invoke
			message.shift
			
			transaction = Transaction.new(self, id)
			@transactions[id] = transaction
			
			name = message.shift
			object = @objects[name]
			
			Async do
				transaction.accept(object, *message)
			ensure
				transaction.close
			end
		else
			raise "Out of order message: #{message}"
		end
	end
ensure
	finalizer_task.stop
	
	@transactions.each do |id, transaction|
		transaction.close
	end
	
	@transactions.clear
	@proxies = ::ObjectSpace::WeakMap.new
end