Class: ParallelTasker

Inherits:
Object
  • Object
show all
Defined in:
lib/parallel_tasker.rb

Overview

Run tasks in parallel threads

Defined Under Namespace

Classes: DoubleRun

Instance Method Summary collapse

Constructor Details

#initialize(concurrency) ⇒ ParallelTasker

Set max number of parallel threads. Yields self to block, if given. Unless block calls #run, it is called automatically.



9
10
11
12
13
14
15
16
17
# File 'lib/parallel_tasker.rb', line 9

def initialize concurrency
  @concurrency = concurrency
  @tasks = {}
  @already_run = false
  if block_given?
    yield self
    self.run unless @already_run
  end
end

Instance Method Details

#add_task(id, &task) ⇒ Object Also known as: <<

Add task to be executed.



20
21
22
# File 'lib/parallel_tasker.rb', line 20

def add_task id, &task
  @tasks[id] = task
end

#runObject

Execute all tasks in separate threads, with maximum asked concurrency of parallel threads. Returns a Hash with all given id as keys, and its value are threads themselves. User can use Thread class methods to verify each task state, such as Thread#join.

Raises:



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/parallel_tasker.rb', line 37

def run
  raise DoubleRun.new('#run called more than one time') if @already_run
  @already_run = true
  @threads = {}
  @concurrency = @tasks.size if @concurrency > @tasks.size
  pending_ids = @tasks.keys
  @already_running_ids = []
  completed_ids = []
  # start initial batch
  pending_ids.shift(@concurrency).each{|id| new_thread(id)}
  # wait for termination
  twait = ThreadsWait.new(*running_threads)
  twait.all_waits do |finished_thread|
    # update arrays
    completed_id = @threads.key(finished_thread)
    @already_running_ids.delete completed_id
    completed_ids << completed_id
    # start new thread if available and below concurrency
    if not pending_ids.empty? and @already_running_ids.size < @concurrency
      new_id = pending_ids.shift
      new_thread new_id
      twait.join_nowait *running_threads
    end
  end
  @threads
end

#task(id) ⇒ Object

Return block for task with given id



27
28
29
# File 'lib/parallel_tasker.rb', line 27

def task id
  @tasks[id]
end