Class: ThrottleQueue::MultiProcess
- Inherits:
-
Object
- Object
- ThrottleQueue::MultiProcess
- Defined in:
- lib/throttle-queue/multi-process.rb
Overview
ThrottleQueue::MultiProcess is a wrapper around ThrottleQueue that shares the queue between multiple processes.
Example:
q = ThrottleQueue::MultiProcess 3
files.each {|file|
q.background(file) {|id|
fetch file
}
}
Instance Method Summary collapse
-
#background(id, &block) ⇒ Object
Adds work to the queue to run in the background, and returns immediately.
-
#foreground(id, &block) ⇒ Object
Adds work to the queue ahead of all background work, and blocks until the given block has been called.
-
#idle? ⇒ Boolean
Returns true if there is nothing queued and no threads are running.
-
#initialize(limit, name = 'ThrottleQueue') ⇒ MultiProcess
constructor
Creates a new ThrottleQueue::MultiProcess with the given rate limit (per second).
-
#shutdown ⇒ Object
Signals the queue to stop processing and shutdown.
-
#wait(timeout = nil) ⇒ Object
Blocks the calling thread while the queue processes work.
Constructor Details
#initialize(limit, name = 'ThrottleQueue') ⇒ MultiProcess
Creates a new ThrottleQueue::MultiProcess with the given rate limit (per second).
If this is the first instace of the shared queue, it becomes the master queue and starts a DRbServer instace. If a DRbServer is already running, it connects to the queue as a remote DRbObject.
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/throttle-queue/multi-process.rb', line 22 def initialize(limit, name = 'ThrottleQueue') tmp = "/tmp/#{name}.sock" FileUtils.touch tmp File.open(tmp, 'r+') {|f| f.flock File::LOCK_EX begin port = f.read.to_i if port == 0 @queue = ThrottleQueue.new(limit) @drb = DRb.start_service nil, @queue f.seek 0, IO::SEEK_SET f.truncate 0 f.write @drb.uri[/\d+$/] f.flock File::LOCK_UN else @queue = DRbObject.new_with_uri("druby://localhost:#{port}") @queue.idle? @drb = DRb.start_service f.flock File::LOCK_UN end rescue DRb::DRbConnError f.seek 0, IO::SEEK_SET f.truncate 0 retry end } end |
Instance Method Details
#background(id, &block) ⇒ Object
Adds work to the queue to run in the background, and returns immediately.
If the block takes an argument, it will be passed the same id used to queue the work.
The block may be preempted by a foreground job started in this or another process. If not preempted, the block will run in this process.
82 83 84 |
# File 'lib/throttle-queue/multi-process.rb', line 82 def background(id, &block) @queue.background(id, &block) end |
#foreground(id, &block) ⇒ Object
Adds work to the queue ahead of all background work, and blocks until the given block has been called.
Will preempt an id of the same value in the background queue, and wait on an id of the same value already in the foreground queue.
If the block takes an argument, it will be passed the same id used to queue the work.
The block may wait on an already queued foreground job in this or another process. If so queued, this block will not run. If the block does run, it will run in this process.
98 99 100 |
# File 'lib/throttle-queue/multi-process.rb', line 98 def foreground(id, &block) @queue.foreground(id, &block) end |
#idle? ⇒ Boolean
Returns true if there is nothing queued and no threads are running
60 61 62 |
# File 'lib/throttle-queue/multi-process.rb', line 60 def idle? @queue.idle? end |
#shutdown ⇒ Object
Signals the queue to stop processing and shutdown.
The DRbServer is shutdown in either the master process or any client process.
54 55 56 57 |
# File 'lib/throttle-queue/multi-process.rb', line 54 def shutdown @queue.shutdown @drb.stop_service if @drb end |
#wait(timeout = nil) ⇒ Object
Blocks the calling thread while the queue processes work.
Returns after the timeout has expired, or after the queue returns to the idle state.
67 68 69 70 71 72 |
# File 'lib/throttle-queue/multi-process.rb', line 67 def wait(timeout = nil) begin @queue.wait(timeout) rescue DRb::DRbConnError end end |