Class: Chantier::ThreadPool
- Inherits:
-
Object
- Object
- Chantier::ThreadPool
- Defined in:
- lib/thread_pool.rb
Overview
Allows you to spin off a pool of Threads that is not larger than X. You can then enqueue tasks to be executed within that pool. When all slots are full the caller will be blocked until a slot becomes available.
manager = ThreadPool.new(slots = 4)
jobs_hose.each_job do | job |
# this call will block until a slot becomes available
manager.fork_task do # this block runs in a subprocess
Churner.new(job).churn
end
manager.still_running? # => most likely "true"
end
manager.block_until_complete! #=> Will block until all the subprocesses have terminated
If you have a finite Enumerable at hand you can also launch it into the ThreadPool, like so:
manager = ThreadPool.new(slots = 4)
manager.map_fork(job_tickets) do | job_ticket |
# this block will run in a forked subprocess
Churner.new(job).churn
...
end
Can be rewritten using Threads if operation on JVM/Rubinius will be feasible.
Constant Summary collapse
- SCHEDULER_SLEEP_SECONDS =
The manager uses loops in a few places. By doing a little sleep() in those loops we can yield process control back to the OS which brings the CPU usage of the managing process to small numbers. If you just do a loop {} MRI will saturate a whole core and not let go off of it until the loop returns.
(1.0 / 500)
Instance Method Summary collapse
-
#block_until_complete! ⇒ Object
Analogous to Process.wait or wait_all - will block until all of the threads have terminated.
-
#fork_task(&blk) ⇒ Object
Run the given block in a thread.
-
#fork_task_in_all_slots(&blk) ⇒ Object
Launch copies of the given task in all available slots for this Pool.
-
#initialize(num_threads, failure_policy: Chantier::FailurePolicies::None.new) ⇒ ThreadPool
constructor
Initializes a new ProcessPool with the given number of workers.
-
#map_fork(arguments_per_job, &blk) ⇒ Object
Distributes the elements in the given Enumerable to parallel workers, N workers at a time.
-
#still_running? ⇒ Boolean
Tells whether some processes are still churning.
Constructor Details
#initialize(num_threads, failure_policy: Chantier::FailurePolicies::None.new) ⇒ ThreadPool
Initializes a new ProcessPool with the given number of workers. If max_failures is given the fork_task method will raise an exception if more than N threads spawned have raised during execution.
39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/thread_pool.rb', line 39 def initialize(num_threads, failure_policy: Chantier::FailurePolicies::None.new) raise "Need at least 1 slot, given #{num_threads.to_i}" unless num_threads.to_i > 0 @threads = [nil] * num_threads.to_i @semaphore = Mutex.new @failure_policy = Chantier::FailurePolicies::MutexWrapper.new(failure_policy) @failure_policy.arm! # Information on the last exception that happened @last_representative_exception = nil end |
Instance Method Details
#block_until_complete! ⇒ Object
Analogous to Process.wait or wait_all - will block until all of the threads have terminated
112 113 114 115 116 117 118 |
# File 'lib/thread_pool.rb', line 112 def block_until_complete! @threads.map do |e| if e.respond_to?(:join) && e.alive? e.join end end end |
#fork_task(&blk) ⇒ Object
Run the given block in a thread. This method will block the thread it is called from until a slot in the thread table becomes free.
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/thread_pool.rb', line 76 def fork_task(&blk) if @failure_policy.limit_reached? raise @last_representative_exception end destination_slot_idx = nil # Try to find a slot in the process table where this job can go catch :_found do loop do @semaphore.synchronize do if destination_slot_idx = @threads.index(nil) @threads[destination_slot_idx] = true # occupy it throw :_found end end sleep SCHEDULER_SLEEP_SECONDS # Breathing room end end # No need to lock this because we already reserved that slot @threads[destination_slot_idx] = Thread.new do # Run the given block run_block_with_exception_protection(&blk) # ...and remove that process from the process table @semaphore.synchronize { @threads[destination_slot_idx] = nil } end end |
#fork_task_in_all_slots(&blk) ⇒ Object
Launch copies of the given task in all available slots for this Pool.
52 53 54 |
# File 'lib/thread_pool.rb', line 52 def fork_task_in_all_slots(&blk) @threads.times { fork_task(&blk) } end |
#map_fork(arguments_per_job, &blk) ⇒ Object
Distributes the elements in the given Enumerable to parallel workers, N workers at a time. The method will return once all the workers for all the elements of the Enumerable have terminated.
pool = ThreadPool.new(5)
pool.map_fork(array_of_urls) do | single_url |
Faraday.get(single_url).response ...
...
...
end
66 67 68 69 70 71 |
# File 'lib/thread_pool.rb', line 66 def map_fork(arguments_per_job, &blk) arguments_per_job.each do | single_block_argument | fork_task { yield(single_block_argument) } end block_until_complete! end |
#still_running? ⇒ Boolean
Tells whether some processes are still churning
106 107 108 |
# File 'lib/thread_pool.rb', line 106 def still_running? @threads.any?{|e| e && e.respond_to?(:alive?) && e.alive? } end |