Class: Rbgo::Channel::BufferChan

Inherits:
SizedQueue
  • Object
show all
Includes:
Chan
Defined in:
lib/rbgo/select_chan.rb

Overview

BufferChan

Instance Method Summary collapse

Methods included from Chan

after, #each, new, perform, tick

Constructor Details

#initialize(max) ⇒ BufferChan

Returns a new instance of BufferChan.



246
247
248
249
250
251
252
# File 'lib/rbgo/select_chan.rb', line 246

def initialize(max)
  super(max)
  @mutex              = Mutex.new
  @cond               = ConditionVariable.new
  self.ios            = []
  self.register_mutex = Mutex.new
end

Instance Method Details

#clearObject



300
301
302
303
304
# File 'lib/rbgo/select_chan.rb', line 300

def clear
  super
  notify
  self
end

#closeObject



306
307
308
309
310
311
312
313
# File 'lib/rbgo/select_chan.rb', line 306

def close
  @mutex.synchronize do
    super
    notify
    @cond.broadcast
    self
  end
end

#pop(nonblock = false) ⇒ Object Also known as: deq, shift



275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
# File 'lib/rbgo/select_chan.rb', line 275

def pop(nonblock = false)
  @mutex.synchronize do
    res = nil
    ok  = true
    ok  = false if empty? && closed?
    begin
      if nonblock
        res = super(true)
      else
        while empty? && !closed?
          @cond.wait(@mutex)
        end
        ok  = false if closed?
        res = super(false)
      end
      notify
      @cond.broadcast
    rescue ThreadError
      raise unless closed?
      ok = false
    end
    [res, ok]
  end
end

#push(obj, nonblock = false) ⇒ Object Also known as: <<, enq



254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
# File 'lib/rbgo/select_chan.rb', line 254

def push(obj, nonblock = false)
  @mutex.synchronize do
    begin
      if nonblock
        super(obj, true)
      else
        while length == queue_max && !closed?
          @cond.wait(@mutex)
        end
        super(obj, false)
      end
      notify
      @cond.broadcast
      self
    rescue ThreadError
      raise ClosedQueueError.new if closed?
      raise
    end
  end
end