Class: ParallelTasker

Inherits:
Object
  • Object
show all
Defined in:
lib/parallel_tasker.rb

Overview

Run tasks in parallel threads

Instance Method Summary collapse

Constructor Details

#initialize(limit) ⇒ ParallelTasker

Set max number of parallel threads



7
8
9
10
# File 'lib/parallel_tasker.rb', line 7

def initialize limit
  @limit = limit
  @tasks = {}
end

Instance Method Details

#add_task(id, &task) ⇒ Object Also known as: <<

Add task to be executed.



13
14
15
# File 'lib/parallel_tasker.rb', line 13

def add_task id, &task
  @tasks[id] = task
end

#runObject

Execute all tasks in separate threads, with maximum asked limit of parallel threads. Returns a Hash with all given id as keys, and its value are threads themselves. User can run Thread#status to see if it terminated with an exception (nil) or not (false), and Thread#value to get either its return value or returned exception.



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/parallel_tasker.rb', line 30

def run
  @threads = {}
  @limit = @tasks.size if @limit > @tasks.size
  pending_ids = @tasks.keys
  @running_ids = []
  completed_ids = []
  # start initial batch
  pending_ids.shift(@limit).each{|id| new_thread(id)}
  # wait for termination
  twait = ThreadsWait.new(*running_threads)
  twait.all_waits do |finished_thread|
    # update arrays
    completed_id = @threads.key(finished_thread)
    @running_ids.delete completed_id
    completed_ids << completed_id
    # start new thread if available and below limit
    if not pending_ids.empty? and @running_ids.size < @limit
      new_id = pending_ids.shift
      new_thread new_id
      twait.join_nowait *running_threads
    end
  end
  @threads
end

#task(id) ⇒ Object

Return block for task with given id



20
21
22
# File 'lib/parallel_tasker.rb', line 20

def task id
  @tasks[id]
end