Class: SizedQueue

Inherits:
Queue
  • Object
show all
Defined in:
lib/thread.rb

Overview

This class represents queues of specified size capacity. The push operation may be blocked if the capacity is full.

See Queue for an example of how a SizedQueue works.

Instance Method Summary collapse

Methods inherited from Queue

#clear, #empty?, #length

Constructor Details

#initialize(max) ⇒ SizedQueue

Creates a fixed-length queue with a maximum size of max.

Raises:

  • (ArgumentError)


251
252
253
254
255
256
257
# File 'lib/thread.rb', line 251

def initialize(max)
  raise ArgumentError, "queue size must be positive" unless max > 0
  @max = max
  @queue_wait = []
  @queue_wait.taint           # enable tainted comunication
  super()
end

Instance Method Details

#maxObject

Returns the maximum size of the queue.



262
263
264
# File 'lib/thread.rb', line 262

def max
  @max
end

#max=(max) ⇒ Object

Sets the maximum size of the queue.



269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
# File 'lib/thread.rb', line 269

def max=(max)
  diff = nil
  @mutex.synchronize {
    if max <= @max
      @max = max
    else
      diff = max - @max
      @max = max
    end
  }
  if diff
    diff.times do
      begin
        t = @queue_wait.shift
        t.run if t
      rescue ThreadError
        retry
      end
    end
  end
  max
end

#num_waitingObject

Returns the number of threads waiting on the queue.



355
356
357
# File 'lib/thread.rb', line 355

def num_waiting
  @waiting.size + @queue_wait.size
end

#pop(*args) ⇒ Object Also known as: shift, deq

Retrieves data from the queue and runs a waiting thread, if any.



327
328
329
330
331
332
333
334
335
336
337
338
339
340
# File 'lib/thread.rb', line 327

def pop(*args)
  retval = super
  @mutex.synchronize {
    if @que.length < @max
      begin
        t = @queue_wait.shift
        t.wakeup if t
      rescue ThreadError
        retry
      end
    end
  }
  retval
end

#push(obj) ⇒ Object Also known as: <<, enq

Pushes obj to the queue. If there is no space left in the queue, waits until space becomes available.



296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
# File 'lib/thread.rb', line 296

def push(obj)
  @mutex.synchronize{
    while true
      break if @que.length < @max
      @queue_wait.push Thread.current
      @mutex.sleep
    end

    @que.push obj
    begin
      t = @waiting.shift
      t.wakeup if t
    rescue ThreadError
      retry
    end
  }
end