Class: SimpleThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/simple-thread-pool.rb

Class Method Summary collapse

Class Method Details

.give_work(*args, &block) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/simple-thread-pool.rb', line 36

def give_work( *args,&block)
  min = -1
  min_key = nil
  @thread_work_queues.each_pair do |k,v|
    if min_key == nil or v.size < min
      min = v.size
      min_key = k
    end
  end
  @thread_work_queues[min_key]
  @locks[min_key].synchronize {
    @thread_work_queues[min_key] << {:block => block, :args => args}
  }
end

.num_executingObject



59
60
61
# File 'lib/simple-thread-pool.rb', line 59

def num_executing
  @current_jobs.keys.inject(0){|a,b| @current_jobs[b] ? a+ 1 : a }
end

.queue_sizeObject



51
52
53
54
55
56
57
# File 'lib/simple-thread-pool.rb', line 51

def queue_size
  size = 0
  @thread_work_queues.each_pair do |k,v|
    size += v.size
  end
  size
end

.start_up(thread_count) ⇒ Object



3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/simple-thread-pool.rb', line 3

def start_up(thread_count)
  @thread_work_queues = {}
  @locks = {}
  @threads = []
  @current_jobs = {}
  thread_count.times do |i|
    @thread_work_queues[i] = []
    @locks[i] = Mutex.new
    @threads << Thread.new(i,@thread_work_queues[i],@locks[i],@current_jobs) do |index,queue,lock,executing_jobs|
      
      while true
        if queue.size > 0
          #do the mutex
          job = nil
          lock.synchronize { 
            job = queue.slice!(0) 
          }
          if job
            executing_jobs[index] = job
            job[:block].call(job[:args])
            executing_jobs.delete(index)
          end
          
        end
        #keep looking for work until the parent pid dies
        #puts "Worker-#{index} out of stuff to do"
        sleep(0.1)
      end
         
    end
  end
end