Class: Backports::FilteredQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/backports/ractor/filtered_queue.rb

Overview

Like ::Queue, but with

  • filtering

  • timeout

  • raises on closed queues

Independent from other Ractor related backports.

Defined Under Namespace

Classes: ClosedQueueError, TimeoutError

Constant Summary collapse

CONSUME_ON_ESCAPE =
true

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeFilteredQueue



34
35
36
37
38
39
40
# File 'lib/backports/ractor/filtered_queue.rb', line 34

def initialize
  @mutex = ::Mutex.new
  @queue = []
  @closed = false
  @received = ::ConditionVariable.new
  @num_waiting = 0
end

Instance Attribute Details

#num_waitingObject (readonly)

Returns the value of attribute num_waiting.



31
32
33
# File 'lib/backports/ractor/filtered_queue.rb', line 31

def num_waiting
  @num_waiting
end

Instance Method Details

#<<(x) ⇒ Object Also known as: push



54
55
56
57
58
59
60
61
# File 'lib/backports/ractor/filtered_queue.rb', line 54

def <<(x)
  @mutex.synchronize do
    ensure_open
    @queue << Message.new(x)
    @received.signal
  end
  self
end

#clearObject



64
65
66
67
68
69
# File 'lib/backports/ractor/filtered_queue.rb', line 64

def clear
  @mutex.synchronize do
    @queue.clear
  end
  self
end

#closeObject



42
43
44
45
46
47
48
# File 'lib/backports/ractor/filtered_queue.rb', line 42

def close
  @mutex.synchronize do
    @closed = true
    @received.broadcast
  end
  self
end

#closed?Boolean

Returns:

  • (Boolean)


50
51
52
# File 'lib/backports/ractor/filtered_queue.rb', line 50

def closed?
  @closed
end

#empty?Boolean

Returns:

  • (Boolean)


85
86
87
88
89
90
91
# File 'lib/backports/ractor/filtered_queue.rb', line 85

def empty?
  avail = @mutex.synchronize do
    available!
  end

  !avail
end

#pop(timeout: nil, &block) ⇒ Object



71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/backports/ractor/filtered_queue.rb', line 71

def pop(timeout: nil, &block)
  msg = nil
  exclude = [] if block # exclusion list of messages rejected by this call
  timeout_time = timeout + Time.now.to_f if timeout
  while true do # rubocop:disable Style/InfiniteLoop, Style/WhileUntilDo
    @mutex.synchronize do
      reenter if reentrant?
      msg = acquire!(timeout_time, exclude)
      return consume!(msg).value unless block
    end
    return msg.value if filter?(msg, &block)
  end
end