Class: Rbgo::Channel::BufferChan

Inherits:
SizedQueue
  • Object
show all
Includes:
Chan
Defined in:
lib/rbgo/select_chan.rb

Overview

BufferChan

Instance Method Summary collapse

Methods included from Chan

after, #each, new, perform, tick

Constructor Details

#initialize(max) ⇒ BufferChan

Returns a new instance of BufferChan.



245
246
247
248
249
250
251
# File 'lib/rbgo/select_chan.rb', line 245

def initialize(max)
  super(max)
  @mutex              = Mutex.new
  @cond               = ConditionVariable.new
  self.ios            = []
  self.register_mutex = Mutex.new
end

Instance Method Details

#clearObject



299
300
301
302
303
# File 'lib/rbgo/select_chan.rb', line 299

def clear
  super
  notify
  self
end

#closeObject



305
306
307
308
309
310
311
312
# File 'lib/rbgo/select_chan.rb', line 305

def close
  @mutex.synchronize do
    super
    notify
    @cond.broadcast
    self
  end
end

#pop(nonblock = false) ⇒ Object Also known as: deq, shift



274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
# File 'lib/rbgo/select_chan.rb', line 274

def pop(nonblock = false)
  @mutex.synchronize do
    res = nil
    ok  = true
    ok  = false if empty? && closed?
    begin
      if nonblock
        res = super(true)
      else
        while empty? && !closed?
          @cond.wait(@mutex)
        end
        ok  = false if closed?
        res = super(false)
      end
      notify
      @cond.broadcast
    rescue ThreadError
      raise unless closed?
      ok = false
    end
    [res, ok]
  end
end

#push(obj, nonblock = false) ⇒ Object Also known as: <<, enq



253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
# File 'lib/rbgo/select_chan.rb', line 253

def push(obj, nonblock = false)
  @mutex.synchronize do
    begin
      if nonblock
        super(obj, true)
      else
        while length == queue_max && !closed?
          @cond.wait(@mutex)
        end
        super(obj, false)
      end
      notify
      @cond.broadcast
      self
    rescue ThreadError
      raise ClosedQueueError.new if closed?
      raise
    end
  end
end