Class: Proco::Queue::Base

Inherits:
Object
  • Object
show all
Includes:
MT::Base
Defined in:
lib/proco/queue/base.rb

Direct Known Subclasses

BatchQueue, MultiQueue, SingleQueue

Defined Under Namespace

Classes: Invalidated

Instance Method Summary collapse

Methods included from MT::Base

#broadcast, #do_when, #signal, #synchronize, #try_when, #wait_until

Constructor Details

#initialize(size, delay) ⇒ Base

Returns a new instance of Base.



15
16
17
18
19
20
21
# File 'lib/proco/queue/base.rb', line 15

def initialize size, delay
  super()
  @size  = size
  @delay = delay || 0
  @items = []
  @valid = true
end

Instance Method Details

#invalidateObject



23
24
25
26
27
# File 'lib/proco/queue/base.rb', line 23

def invalidate
  broadcast do
    @valid = false
  end
end

#push(item) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/proco/queue/base.rb', line 29

def push item
  @mtx.lock
  while true
    raise Invalidated unless @valid
    break if @items.length < @size
    @cv.wait @mtx
  end
  push_impl item
ensure
  @cv.broadcast
  @mtx.unlock
end

#takeObject



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/proco/queue/base.rb', line 42

def take
  @mtx.lock
  wait_at = nil
  while true
    empty = @items.empty?
    unless empty
      if wait_at && @delay > 0
        n = Time.now
        t = wait_at + @delay
        t += @delay * ((n - t) / @delay).to_i if t < n
        t += @delay if t < n

        # Haven't took anything.
        # No need to broadcast to blocked pushers
        @mtx.unlock
        sleep t - n
        @mtx.lock
      end
      break
    end
    return nil unless @valid
    wait_at = Time.now
    @cv.wait @mtx
  end
  take_impl
ensure
  @cv.broadcast
  @mtx.unlock
end