Class: Rbgo::Channel::BufferChan
- Inherits:
-
SizedQueue
- Object
- SizedQueue
- Rbgo::Channel::BufferChan
show all
- Includes:
- Chan
- Defined in:
- lib/rbgo/select_chan.rb
Overview
Instance Method Summary
collapse
Methods included from Chan
after, #each, new, perform, tick
Constructor Details
Returns a new instance of BufferChan.
245
246
247
248
249
250
251
|
# File 'lib/rbgo/select_chan.rb', line 245
def initialize(max)
super(max)
@mutex = Mutex.new
@cond = ConditionVariable.new
self.ios = []
self.register_mutex = Mutex.new
end
|
Instance Method Details
#clear ⇒ Object
299
300
301
302
303
|
# File 'lib/rbgo/select_chan.rb', line 299
def clear
super
notify
self
end
|
#close ⇒ Object
305
306
307
308
309
310
311
312
|
# File 'lib/rbgo/select_chan.rb', line 305
def close
@mutex.synchronize do
super
notify
@cond.broadcast
self
end
end
|
#pop(nonblock = false) ⇒ Object
Also known as:
deq, shift
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
|
# File 'lib/rbgo/select_chan.rb', line 274
def pop(nonblock = false)
@mutex.synchronize do
res = nil
ok = true
ok = false if empty? && closed?
begin
if nonblock
res = super(true)
else
while empty? && !closed?
@cond.wait(@mutex)
end
ok = false if closed?
res = super(false)
end
notify
@cond.broadcast
rescue ThreadError
raise unless closed?
ok = false
end
[res, ok]
end
end
|
#push(obj, nonblock = false) ⇒ Object
Also known as:
<<, enq
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
|
# File 'lib/rbgo/select_chan.rb', line 253
def push(obj, nonblock = false)
@mutex.synchronize do
begin
if nonblock
super(obj, true)
else
while length == queue_max && !closed?
@cond.wait(@mutex)
end
super(obj, false)
end
notify
@cond.broadcast
self
rescue ThreadError
raise ClosedQueueError.new if closed?
raise
end
end
end
|