Class: ThrottleQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/throttle-queue.rb

Overview

}

Defined Under Namespace

Classes: FG, PriorityQueue

Instance Method Summary collapse

Constructor Details

#initialize(limit) ⇒ ThrottleQueue

Creates a new ThrottleQueue with the given rate limit (per second).



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/throttle-queue.rb', line 14

def initialize(limit)
	raise "refusing to do zero work per second" if limit <= 0
	@limit = limit

	@queue = PriorityQueue.new

	@mutex = Mutex.new
	@pausing = ConditionVariable.new
	@idle = ConditionVariable.new
	@in_flight = nil
	@processing_thread = nil
	@items = {}

	@throttling = nil
	@state = :idle
	@t0 = Time.now
end

Instance Method Details

#background(id, &block) ⇒ Object

Adds work to the queue to run in the background, and returns immediately.

If the block takes an argument, it will be passed the same id used to queue the work.



58
59
60
61
62
63
64
65
66
# File 'lib/throttle-queue.rb', line 58

def background(id, &block)
	@mutex.synchronize {
		if id != @in_flight
			@items[id] = block
			@queue.background id
			run
		end
	}
end

#foreground(id, &block) ⇒ Object

Adds work to the queue ahead of all background work, and blocks until the given block has been called.

Will preempt an id of the same value in either the background or foreground queues.

If the block takes an argument, it will be passed the same id used to queue the work.



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/throttle-queue.rb', line 75

def foreground(id, &block)
	t = nil
	@mutex.synchronize {
		if id == @in_flight
			t = @processing_thread unless @processing_thread == Thread.current
		else
			b = @items[id]
			b.kill if b.is_a? FG

			t = @items[id] = FG.new block, self

			@queue.foreground id
			run
		end
	}
	t.join if t
end

#idle?Boolean

Returns true if there is nothing queued and no threads are running

Returns:

  • (Boolean)


41
42
43
# File 'lib/throttle-queue.rb', line 41

def idle?
	@state == :idle
end

#shutdownObject

Signals the queue to stop processing and shutdown.

Items still in the queue are dropped. Any item currently in flight will finish.



35
36
37
38
# File 'lib/throttle-queue.rb', line 35

def shutdown
	@queue.shutdown
	@pausing.signal
end

#wait(timeout = nil) ⇒ Object

Blocks the calling thread while the queue processes work.

Returns after the timeout has expired, or after the queue returns to the idle state.



48
49
50
51
52
# File 'lib/throttle-queue.rb', line 48

def wait(timeout = nil)
	@mutex.synchronize {
		@idle.wait(@mutex, timeout) unless idle?
	}
end