Class: ThrottleQueue
- Inherits:
-
Object
- Object
- ThrottleQueue
- Defined in:
- lib/throttle-queue/single-process.rb,
lib/throttle-queue/multi-process.rb
Overview
}
Defined Under Namespace
Classes: FG, MultiProcess, PriorityQueue
Instance Method Summary collapse
-
#background(id, &block) ⇒ Object
Adds work to the queue to run in the background, and returns immediately.
-
#foreground(id, &block) ⇒ Object
Adds work to the queue ahead of all background work, and blocks until the given block has been called.
-
#idle? ⇒ Boolean
Returns true if there is nothing queued and no threads are running.
-
#initialize(limit) ⇒ ThrottleQueue
constructor
Creates a new ThrottleQueue with the given rate limit (per second).
-
#shutdown ⇒ Object
Signals the queue to stop processing and shutdown.
-
#wait(timeout = nil) ⇒ Object
Blocks the calling thread while the queue processes work.
Constructor Details
#initialize(limit) ⇒ ThrottleQueue
Creates a new ThrottleQueue with the given rate limit (per second).
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/throttle-queue/single-process.rb', line 14 def initialize(limit) raise "refusing to do zero work per second" if limit <= 0 @limit = limit @queue = PriorityQueue.new @mutex = Mutex.new @pausing = ConditionVariable.new @idle = ConditionVariable.new @in_flight = nil @processing_thread = nil @items = {} @throttling = nil @state = :idle @t0 = Time.now end |
Instance Method Details
#background(id, &block) ⇒ Object
Adds work to the queue to run in the background, and returns immediately.
If the block takes an argument, it will be passed the same id used to queue the work.
58 59 60 61 62 63 64 65 66 |
# File 'lib/throttle-queue/single-process.rb', line 58 def background(id, &block) @mutex.synchronize { unless @items.has_key? id @items[id] = block @queue.background id run end } end |
#foreground(id, &block) ⇒ Object
Adds work to the queue ahead of all background work, and blocks until the given block has been called.
Will preempt an id of the same value in the background queue, and wait on an id of the same value already in the foreground queue.
If the block takes an argument, it will be passed the same id used to queue the work.
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/throttle-queue/single-process.rb', line 76 def foreground(id, &block) t = nil @mutex.synchronize { if id == @in_flight t = @processing_thread unless @processing_thread == Thread.current else t = @items[id] unless t.is_a? FG t = @items[id] = FG.new block, self @queue.foreground id run end end } t.join if t end |
#idle? ⇒ Boolean
Returns true if there is nothing queued and no threads are running
41 42 43 |
# File 'lib/throttle-queue/single-process.rb', line 41 def idle? @state == :idle end |
#shutdown ⇒ Object
Signals the queue to stop processing and shutdown.
Items still in the queue are dropped. Any item currently in flight will finish.
35 36 37 38 |
# File 'lib/throttle-queue/single-process.rb', line 35 def shutdown @queue.shutdown @pausing.signal end |
#wait(timeout = nil) ⇒ Object
Blocks the calling thread while the queue processes work.
Returns after the timeout has expired, or after the queue returns to the idle state.
48 49 50 51 52 |
# File 'lib/throttle-queue/single-process.rb', line 48 def wait(timeout = nil) @mutex.synchronize { @idle.wait(@mutex, timeout) unless idle? } end |