Class: Baykit::BayServer::Agent::Multiplexer::SpinMultiplexer
Defined Under Namespace
Classes: Lapper, ReadIOLapper
Instance Attribute Summary collapse
#agent, #channel_count, #lock, #rudders, #rudders_lock
Instance Method Summary
collapse
#add_rudder_state, #close_all, #close_timeout_sockets, #consume_oldest_unit, #find_rudder_state_by_key, #get_rudder_state, #get_transporter, #is_busy, #remove_rudder_state
#add_rudder_state, #consume_oldest_unit, #get_rudder_state, #get_transporter, #is_busy, #remove_rudder_state
Constructor Details
Returns a new instance of SpinMultiplexer.
104
105
106
107
108
109
110
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 104
def initialize(agt)
super(agt)
@spin_count = 0
@running_list = []
@running_list_lock = Mutex.new
@agent.add_timer_handler(self)
end
|
Instance Attribute Details
#running_list ⇒ Object
Returns the value of attribute running_list.
101
102
103
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 101
def running_list
@running_list
end
|
#running_list_lock ⇒ Object
Returns the value of attribute running_list_lock.
102
103
104
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 102
def running_list_lock
@running_list_lock
end
|
#spin_count ⇒ Object
Returns the value of attribute spin_count.
100
101
102
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 100
def spin_count
@spin_count
end
|
Instance Method Details
#cancel_read(st) ⇒ Object
199
200
201
202
203
204
205
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 199
def cancel_read(st)
st.reading_lock.synchronize do
BayLog.debug("%s Reading off %s", agent, st.rudder)
st.reading = false
end
remove_from_running_list(st)
end
|
#cancel_write(st) ⇒ Object
207
208
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 207
def cancel_write(st)
end
|
#close_rudder(st) ⇒ Object
241
242
243
244
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 241
def close_rudder(st)
remove_from_running_list(st)
super
end
|
#is_empty ⇒ Object
257
258
259
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 257
def is_empty
return @running_list.empty?
end
|
#is_non_blocking ⇒ Object
190
191
192
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 190
def is_non_blocking
return false
end
|
#next_accept(st) ⇒ Object
210
211
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 210
def next_accept(st)
end
|
#next_read(st) ⇒ Object
213
214
215
216
217
218
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 213
def next_read(st)
lpr = ReadIOLapper.new(self, @agent, st)
lpr.next
add_to_running_list(lpr)
end
|
#next_write(st) ⇒ Object
220
221
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 220
def next_write(st)
end
|
#on_busy ⇒ Object
223
224
225
226
227
228
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 223
def on_busy
BayLog.debug("%s onBusy", agent)
BayServer::anchorable_port_map.keys.each do |rd|
end
end
|
#on_free ⇒ Object
230
231
232
233
234
235
236
237
238
239
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 230
def on_free
BayLog.debug("%s onFree aborted=%s", agent, agent.aborted);
if agent.aborted
return
end
BayServer.anchorable_port_map.keys.each do |rd|
end
end
|
#on_timer ⇒ Object
250
251
252
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 250
def on_timer
end
|
#process_data ⇒ Object
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 261
def process_data
if is_empty
return false
end
all_spun = true
remove_list = []
@running_list.length.downto(1) do |i|
lpr = @running_list[i-1]
st = lpr.state
spun = lpr.lap
st.access
all_spun = all_spun && spun
end
if all_spun
@spin_count += 1
if @spin_count > 10
sleep(0.01)
else
@spin_count = 0
end
end
return true
end
|
#req_accept(rd) ⇒ Object
119
120
121
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 119
def req_accept(rd)
raise NotImplementedError.new
end
|
#req_close(rd) ⇒ Object
178
179
180
181
182
183
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 178
def req_close(rd)
st = get_rudder_state(rd)
st.closing = true
close_rudder(rd)
@agent.send_closed_letter(st.id, st.rudder, self, false)
end
|
#req_connect(rd, adr) ⇒ Object
123
124
125
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 123
def req_connect(rd, adr)
raise NotImplementedError.new
end
|
#req_end(rd) ⇒ Object
173
174
175
176
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 173
def req_end(rd)
st = get_rudder_state(rd)
st.finale = true
end
|
#req_read(rd) ⇒ Object
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 127
def req_read(rd)
st = get_rudder_state(rd)
if st == nil
BayLog.error("%s Invalid rudder", self)
return
end
need_read = false
st.reading_lock.synchronize do
if not st.reading
need_read = true
st.reading = true
end
end
if need_read
next_read(st)
end
end
|
#req_write(rd, buf, adr, tag, &lis) ⇒ Object
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 147
def req_write(rd, buf, adr, tag, &lis)
st = get_rudder_state(rd)
if st == nil
BayLog.warn("Invalid rudder")
lis.call()
end
unt = WriteUnit.new(buf, adr, tag, &lis)
st.write_queue_lock.synchronize do
st.write_queue << unt
end
st.access
need_write = false
st.writing_lock.synchronize do
if not st.writing
need_write = true
st.writing = true
end
end
if need_write
next_write(st)
end
end
|
#shutdown ⇒ Object
186
187
188
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 186
def shutdown
wakeup
end
|
#to_s ⇒ Object
111
112
113
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 111
def to_s
return "SpnMpx[#{@agent}]"
end
|
#use_async_api ⇒ Object
194
195
196
|
# File 'lib/baykit/bayserver/agent/multiplexer/spin_multiplexer.rb', line 194
def use_async_api
return false
end
|