Class: Purplelight::ByteQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/purplelight/queue.rb

Overview

Sized queue that tracks bytes to apply backpressure.

Instance Method Summary collapse

Constructor Details

#initialize(max_bytes: 128 * 1024 * 1024) ⇒ ByteQueue

Returns a new instance of ByteQueue.



6
7
8
9
10
11
12
13
# File 'lib/purplelight/queue.rb', line 6

def initialize(max_bytes: 128 * 1024 * 1024)
  @max_bytes = max_bytes
  @queue = []
  @bytes = 0
  @closed = false
  @mutex = Mutex.new
  @cv = ConditionVariable.new
end

Instance Method Details

#closeObject



40
41
42
43
44
45
# File 'lib/purplelight/queue.rb', line 40

def close
  @mutex.synchronize do
    @closed = true
    @cv.broadcast
  end
end

#popObject



26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/purplelight/queue.rb', line 26

def pop
  @mutex.synchronize do
    while @queue.empty?
      return nil if @closed

      @cv.wait(@mutex)
    end
    item, bytes = @queue.shift
    @bytes -= bytes
    @cv.broadcast
    item
  end
end

#push(item, bytes:) ⇒ Object



15
16
17
18
19
20
21
22
23
24
# File 'lib/purplelight/queue.rb', line 15

def push(item, bytes:)
  @mutex.synchronize do
    raise 'queue closed' if @closed

    @cv.wait(@mutex) while (@bytes + bytes) > @max_bytes
    @queue << [item, bytes]
    @bytes += bytes
    @cv.broadcast
  end
end

#size_bytesObject



47
48
49
# File 'lib/purplelight/queue.rb', line 47

def size_bytes
  @mutex.synchronize { @bytes }
end