Class: ThrottleQueue
- Inherits:
-
Object
- Object
- ThrottleQueue
- Defined in:
- lib/throttle-queue.rb
Overview
}
Defined Under Namespace
Classes: FG, PriorityQueue
Instance Method Summary collapse
-
#background(id, &block) ⇒ Object
Adds work to the queue to run in the background, and returns immediately.
-
#foreground(id, &block) ⇒ Object
Adds work to the queue ahead of all background work, and blocks until the given block has been called.
-
#idle? ⇒ Boolean
Returns true if there is nothing queued and no threads are running.
-
#initialize(limit) ⇒ ThrottleQueue
constructor
Creates a new ThrottleQueue with the given rate limit (per second).
-
#shutdown ⇒ Object
Signals the queue to stop processing and shutdown.
-
#wait(timeout = nil) ⇒ Object
Blocks the calling thread while the queue processes work.
Constructor Details
#initialize(limit) ⇒ ThrottleQueue
Creates a new ThrottleQueue with the given rate limit (per second).
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/throttle-queue.rb', line 14 def initialize(limit) raise "refusing to do zero work per second" if limit <= 0 @limit = limit @queue = PriorityQueue.new @mutex = Mutex.new @pausing = ConditionVariable.new @idle = ConditionVariable.new @in_flight = nil @processing_thread = nil @items = {} @throttling = nil @state = :idle @t0 = Time.now end |
Instance Method Details
#background(id, &block) ⇒ Object
Adds work to the queue to run in the background, and returns immediately.
If the block takes an argument, it will be passed the same id used to queue the work.
58 59 60 61 62 63 64 65 66 |
# File 'lib/throttle-queue.rb', line 58 def background(id, &block) @mutex.synchronize { if id != @in_flight @items[id] = block @queue.background id run end } end |
#foreground(id, &block) ⇒ Object
Adds work to the queue ahead of all background work, and blocks until the given block has been called.
Will preempt an id of the same value in either the background or foreground queues.
If the block takes an argument, it will be passed the same id used to queue the work.
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/throttle-queue.rb', line 75 def foreground(id, &block) t = nil @mutex.synchronize { if id == @in_flight t = @processing_thread unless @processing_thread == Thread.current else b = @items[id] b.kill if b.is_a? FG t = @items[id] = FG.new block, self @queue.foreground id run end } t.join if t end |
#idle? ⇒ Boolean
Returns true if there is nothing queued and no threads are running
41 42 43 |
# File 'lib/throttle-queue.rb', line 41 def idle? @state == :idle end |
#shutdown ⇒ Object
Signals the queue to stop processing and shutdown.
Items still in the queue are dropped. Any item currently in flight will finish.
35 36 37 38 |
# File 'lib/throttle-queue.rb', line 35 def shutdown @queue.shutdown @pausing.signal end |
#wait(timeout = nil) ⇒ Object
Blocks the calling thread while the queue processes work.
Returns after the timeout has expired, or after the queue returns to the idle state.
48 49 50 51 52 |
# File 'lib/throttle-queue.rb', line 48 def wait(timeout = nil) @mutex.synchronize { @idle.wait(@mutex, timeout) unless idle? } end |