Class: Baykit::BayServer::Agent::Multiplexer::SpinMultiplexer
Defined Under Namespace
Classes: Lapper, ReadIOLapper
Instance Attribute Summary collapse
#agent, #channel_count, #lock, #rudders, #rudders_lock
Instance Method Summary
collapse
#add_rudder_state, #close_all, #close_timeout_sockets, #consume_oldest_unit, #find_rudder_state_by_key, #get_rudder_state, #get_transporter, #is_busy, #remove_rudder_state
#add_rudder_state, #consume_oldest_unit, #get_rudder_state, #get_transporter, #is_busy, #remove_rudder_state
Constructor Details
Returns a new instance of SpinMultiplexer.
100
101
102
103
104
105
106
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 100
def initialize(agt)
super(agt)
@spin_count = 0
@running_list = []
@running_list_lock = Mutex.new
@agent.add_timer_handler(self)
end
|
Instance Attribute Details
#running_list ⇒ Object
Returns the value of attribute running_list.
97
98
99
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 97
def running_list
@running_list
end
|
#running_list_lock ⇒ Object
Returns the value of attribute running_list_lock.
98
99
100
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 98
def running_list_lock
@running_list_lock
end
|
#spin_count ⇒ Object
Returns the value of attribute spin_count.
96
97
98
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 96
def spin_count
@spin_count
end
|
Instance Method Details
#cancel_read(st) ⇒ Object
194
195
196
197
198
199
200
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 194
def cancel_read(st)
st.reading_lock.synchronize do
BayLog.debug("%s Reading off %s", agent, st.rudder)
st.reading = false
end
remove_from_running_list(st)
end
|
#cancel_write(st) ⇒ Object
202
203
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 202
def cancel_write(st)
end
|
#close_rudder(st) ⇒ Object
236
237
238
239
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 236
def close_rudder(st)
remove_from_running_list(st)
super
end
|
#is_empty ⇒ Object
252
253
254
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 252
def is_empty
return @running_list.empty?
end
|
#is_non_blocking ⇒ Object
185
186
187
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 185
def is_non_blocking
return false
end
|
#next_accept(st) ⇒ Object
205
206
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 205
def next_accept(st)
end
|
#next_read(st) ⇒ Object
208
209
210
211
212
213
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 208
def next_read(st)
lpr = ReadIOLapper.new(@agent, st)
lpr.next
add_to_running_list(lpr)
end
|
#next_write(st) ⇒ Object
215
216
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 215
def next_write(st)
end
|
#on_busy ⇒ Object
218
219
220
221
222
223
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 218
def on_busy
BayLog.debug("%s onBusy", agent)
BayServer::anchorable_port_map.keys.each do |rd|
end
end
|
#on_free ⇒ Object
225
226
227
228
229
230
231
232
233
234
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 225
def on_free
BayLog.debug("%s onFree aborted=%s", agent, agent.aborted);
if agent.aborted
return
end
BayServer.anchorable_port_map.keys.each do |rd|
end
end
|
#on_timer ⇒ Object
245
246
247
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 245
def on_timer
end
|
#process_data ⇒ Object
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 256
def process_data
if is_empty
return false
end
all_spun = true
remove_list = []
@running_list.length.downto(1) do |i|
lpr = @running_list[i-1]
st = lpr.state
spun = lpr.lap
st.access
all_spun = all_spun & spun
end
if all_spun
@spin_count += 1
if @spin_count > 10
sleep(0.01)
else
@spin_count = 0
end
end
return true
end
|
#req_accept(rd) ⇒ Object
115
116
117
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 115
def req_accept(rd)
raise NotImplementedError.new
end
|
#req_close(rd) ⇒ Object
174
175
176
177
178
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 174
def req_close(rd)
st = get_rudder_state(rd)
close_rudder(st)
@agent.send_closed_letter(st, false)
end
|
#req_connect(rd, adr) ⇒ Object
119
120
121
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 119
def req_connect(rd, adr)
raise NotImplementedError.new
end
|
#req_end(rd) ⇒ Object
169
170
171
172
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 169
def req_end(rd)
st = get_rudder_state(rd)
st.finale = true
end
|
#req_read(rd) ⇒ Object
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 123
def req_read(rd)
st = get_rudder_state(rd)
if st == nil
BayLog.error("%s Invalid rudder", self)
return
end
need_read = false
st.reading_lock.synchronize do
if not st.reading
need_read = true
st.reading = true
end
end
if need_read
next_read(st)
end
end
|
#req_write(rd, buf, len, adr, tag, &lis) ⇒ Object
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 143
def req_write(rd, buf, len, adr, tag, &lis)
st = get_rudder_state(rd)
if st == nil
BayLog.warn("Invalid rudder")
lis.call()
end
unt = WriteUnit.new(buf, adr, tag, &lis)
st.write_queue_lock.synchronize do
st.write_queue << unt
end
st.access
need_write = false
st.writing_lock.synchronize do
if not st.writing
need_write = true
st.writing = true
end
end
if need_write
next_write(st)
end
end
|
#shutdown ⇒ Object
181
182
183
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 181
def shutdown
wakeup
end
|
#to_s ⇒ Object
107
108
109
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 107
def to_s
return "SpnMpx[#{@agent}]"
end
|
#use_async_api ⇒ Object
189
190
191
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 189
def use_async_api
return false
end
|