Class: ParallelTasker
- Inherits:
-
Object
- Object
- ParallelTasker
- Defined in:
- lib/parallel_tasker.rb
Overview
Run tasks in parallel threads
Instance Method Summary collapse
-
#add_task(id, &task) ⇒ Object
(also: #<<)
Add task to be executed.
-
#initialize(limit) ⇒ ParallelTasker
constructor
Set max number of parallel threads.
-
#run ⇒ Object
Execute all tasks in separate threads, with maximum asked limit of parallel threads.
-
#task(id) ⇒ Object
Return block for task with given id.
Constructor Details
#initialize(limit) ⇒ ParallelTasker
Set max number of parallel threads
7 8 9 10 |
# File 'lib/parallel_tasker.rb', line 7 def initialize limit @limit = limit @tasks = {} end |
Instance Method Details
#add_task(id, &task) ⇒ Object Also known as: <<
Add task to be executed.
13 14 15 |
# File 'lib/parallel_tasker.rb', line 13 def add_task id, &task @tasks[id] = task end |
#run ⇒ Object
Execute all tasks in separate threads, with maximum asked limit of parallel threads. Returns a Hash with all given id as keys, and its value are threads themselves. User can run Thread#status to see if it terminated with an exception (nil) or not (false), and Thread#value to get either its return value or returned exception.
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/parallel_tasker.rb', line 30 def run @threads = {} @limit = @tasks.size if @limit > @tasks.size pending_ids = @tasks.keys @running_ids = [] completed_ids = [] # start initial batch pending_ids.shift(@limit).each{|id| new_thread(id)} # wait for termination twait = ThreadsWait.new(*running_threads) twait.all_waits do |finished_thread| # update arrays completed_id = @threads.key(finished_thread) @running_ids.delete completed_id completed_ids << completed_id # start new thread if available and below limit if not pending_ids.empty? and @running_ids.size < @limit new_id = pending_ids.shift new_thread new_id twait.join_nowait *running_threads end end @threads end |
#task(id) ⇒ Object
Return block for task with given id
20 21 22 |
# File 'lib/parallel_tasker.rb', line 20 def task id @tasks[id] end |