Class: ParallelTasker
- Inherits:
-
Object
- Object
- ParallelTasker
- Defined in:
- lib/parallel_tasker.rb
Overview
Run tasks in parallel threads
Defined Under Namespace
Classes: DoubleRun
Instance Method Summary collapse
-
#add_task(id, &task) ⇒ Object
(also: #<<)
Add task to be executed.
-
#initialize(concurrency) ⇒ ParallelTasker
constructor
Set max number of parallel threads.
-
#run ⇒ Object
Execute all tasks in separate threads, with maximum asked concurrency of parallel threads.
-
#task(id) ⇒ Object
Return block for task with given id.
Constructor Details
#initialize(concurrency) ⇒ ParallelTasker
Set max number of parallel threads. Yields self to block, if given. Unless block calls #run, it is called automatically.
9 10 11 12 13 14 15 16 17 |
# File 'lib/parallel_tasker.rb', line 9 def initialize concurrency @concurrency = concurrency @tasks = {} @already_run = false if block_given? yield self self.run unless @already_run end end |
Instance Method Details
#add_task(id, &task) ⇒ Object Also known as: <<
Add task to be executed.
20 21 22 |
# File 'lib/parallel_tasker.rb', line 20 def add_task id, &task @tasks[id] = task end |
#run ⇒ Object
Execute all tasks in separate threads, with maximum asked concurrency of parallel threads. Returns a Hash with all given id as keys, and its value are threads themselves. User can use Thread class methods to verify each task state, such as Thread#join.
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/parallel_tasker.rb', line 37 def run raise DoubleRun.new('#run called more than one time') if @already_run @already_run = true @threads = {} @concurrency = @tasks.size if @concurrency > @tasks.size pending_ids = @tasks.keys @already_running_ids = [] completed_ids = [] # start initial batch pending_ids.shift(@concurrency).each{|id| new_thread(id)} # wait for termination twait = ThreadsWait.new(*running_threads) twait.all_waits do |finished_thread| # update arrays completed_id = @threads.key(finished_thread) @already_running_ids.delete completed_id completed_ids << completed_id # start new thread if available and below concurrency if not pending_ids.empty? and @already_running_ids.size < @concurrency new_id = pending_ids.shift new_thread new_id twait.join_nowait *running_threads end end @threads end |
#task(id) ⇒ Object
Return block for task with given id
27 28 29 |
# File 'lib/parallel_tasker.rb', line 27 def task id @tasks[id] end |