Class: ThrottleQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/throttle-queue/single-process.rb,
lib/throttle-queue/multi-process.rb

Overview

}

Defined Under Namespace

Classes: FG, MultiProcess, PriorityQueue

Instance Method Summary collapse

Constructor Details

#initialize(limit) ⇒ ThrottleQueue

Creates a new ThrottleQueue with the given rate limit (per second).



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/throttle-queue/single-process.rb', line 14

def initialize(limit)
  raise "refusing to do zero work per second" if limit <= 0
  @limit = limit

  @queue = PriorityQueue.new

  @mutex = Mutex.new
  @pausing = ConditionVariable.new
  @idle = ConditionVariable.new
  @in_flight = nil
  @processing_thread = nil
  @items = {}

  @throttling = nil
  @state = :idle
  @t0 = Time.now
end

Instance Method Details

#background(id, &block) ⇒ Object

Adds work to the queue to run in the background, and returns immediately.

If the block takes an argument, it will be passed the same id used to queue the work.



58
59
60
61
62
63
64
65
66
# File 'lib/throttle-queue/single-process.rb', line 58

def background(id, &block)
  @mutex.synchronize {
    unless @items.has_key? id
      @items[id] = block
      @queue.background id
      run
    end
  }
end

#foreground(id, &block) ⇒ Object

Adds work to the queue ahead of all background work, and blocks until the given block has been called.

Will preempt an id of the same value in the background queue, and wait on an id of the same value already in the foreground queue.

If the block takes an argument, it will be passed the same id used to queue the work.



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/throttle-queue/single-process.rb', line 76

def foreground(id, &block)
  t = nil
  @mutex.synchronize {
    if id == @in_flight
      t = @processing_thread unless @processing_thread == Thread.current
    else
      t = @items[id]
      unless t.is_a? FG
        t = @items[id] = FG.new block, self
        @queue.foreground id
        run
      end
    end
  }
  t.join if t
end

#idle?Boolean

Returns true if there is nothing queued and no threads are running

Returns:

  • (Boolean)


41
42
43
# File 'lib/throttle-queue/single-process.rb', line 41

def idle?
  @state == :idle
end

#shutdownObject

Signals the queue to stop processing and shutdown.

Items still in the queue are dropped. Any item currently in flight will finish.



35
36
37
38
# File 'lib/throttle-queue/single-process.rb', line 35

def shutdown
  @queue.shutdown
  @pausing.signal
end

#wait(timeout = nil) ⇒ Object

Blocks the calling thread while the queue processes work.

Returns after the timeout has expired, or after the queue returns to the idle state.



48
49
50
51
52
# File 'lib/throttle-queue/single-process.rb', line 48

def wait(timeout = nil)
  @mutex.synchronize {
    @idle.wait(@mutex, timeout) unless idle?
  }
end