Class: ASIR::ThreadPool

Inherits:
Object
  • Object
show all
Includes:
AdditionalData, Initialization
Defined in:
lib/asir/thread_pool.rb

Defined Under Namespace

Classes: Work, Worker

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from AdditionalData

#[], #[]=, #_additional_data, #additional_data, #additional_data!, #additional_data=, included

Constructor Details

#initialize(*args) ⇒ ThreadPool

Returns a new instance of ThreadPool.



14
15
16
17
18
19
20
21
22
23
24
# File 'lib/asir/thread_pool.rb', line 14

def initialize *args
  super
  @thread_class ||= ::Thread
  @workers_mutex  = Mutex.new
  @work_mutex     = Mutex.new
  @workers      ||= [ ]
  @work_queue   ||= Queue.new
  @run = false
  @work_id = @worker_id = 0
  @time_0 ||= Time.now
end

Instance Attribute Details

#n_workersObject

Returns the value of attribute n_workers.



9
10
11
# File 'lib/asir/thread_pool.rb', line 9

def n_workers
  @n_workers
end

#runObject

Returns the value of attribute run.



12
13
14
# File 'lib/asir/thread_pool.rb', line 12

def run
  @run
end

#thread_classObject

Returns the value of attribute thread_class.



9
10
11
# File 'lib/asir/thread_pool.rb', line 9

def thread_class
  @thread_class
end

#verboseObject

Returns the value of attribute verbose.



11
12
13
# File 'lib/asir/thread_pool.rb', line 11

def verbose
  @verbose
end

#work_queueObject

Returns the value of attribute work_queue.



10
11
12
# File 'lib/asir/thread_pool.rb', line 10

def work_queue
  @work_queue
end

#workersObject

Returns the value of attribute workers.



9
10
11
# File 'lib/asir/thread_pool.rb', line 9

def workers
  @workers
end

Instance Method Details

#join(*args) ⇒ Object



133
134
135
136
137
138
139
# File 'lib/asir/thread_pool.rb', line 133

def join *args
  until @workers.empty?
    @workers.each do | worker |
      worker.join(*args)
    end
  end
end

#kill!(*args) ⇒ Object



122
123
124
125
126
127
128
129
130
131
# File 'lib/asir/thread_pool.rb', line 122

def kill! *args
  log! "kill!"
  @run = false
  @workers_mutex.synchronize do
    @workers.each do | worker |
      worker.kill! *args
    end
  end
  self
end

#log!(msg = nil) ⇒ Object



100
101
102
103
104
105
106
# File 'lib/asir/thread_pool.rb', line 100

def log! msg = nil
  return unless @verbose
  msg ||= yield
  @time_1 = Time.now
  $stderr.puts "  #{@time_1 - @time_0} #{$$} #{Thread.current.object_id} #{self} #{msg}"
  self
end

#new(&blk) ⇒ Object

Returns a new Work object.



27
28
29
30
31
32
33
34
35
36
37
# File 'lib/asir/thread_pool.rb', line 27

def new &blk
  work_id = @work_mutex.synchronize do
    @work_id += 1
  end
  work = Work.new(:block => blk, :work_id => work_id)
  work_created! work
  @work_queue.enq(work)
  @run = true
  start_workers! if @auto_start_workers
  work
end

#start_workers!Object

Keep a list of workers busy.



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/asir/thread_pool.rb', line 40

def start_workers!
  return nil unless @run
  workers_size = @workers.size
  want_n = [ n_workers, @work_queue.size ].min
  want_n = n_workers if want_n > n_workers
  start_n = want_n - workers_size
  start_n = 0 if start_n < 0
  return unless start_n > 0
  log! { "start_workers! #{start_n}" }
  start_n.times do
    thread_class.new do
      worker_id = @workers_mutex.synchronize do
        @worker_id += 1
      end
      worker = Worker.new(:thread_pool => self, :worker_id => worker_id)
      worker_created! worker
      begin
        worker_starting! worker
        @workers_mutex.synchronize do
          @workers << worker
        end
        worker.run!
#          rescue ::Exception => exc
#            log! "ERROR: #{exc.inspect}\n#{exc.backtrace * "\n"}"
#            raise exc
      ensure
        @workers_mutex.synchronize do
          @workers.delete(worker)
        end
        worker_stopping! worker
      end
    end
  end
  self
end

#stop!Object



108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/asir/thread_pool.rb', line 108

def stop!
  log! "stop!"
  @run = false
  # Ask each current worker to :stop!
  @workers_mutex.synchronize do
    @workers.each do | w |
      @work_queue.enq :stop!
    end
  end
  # Just incase.
  @work_queue.enq :stop!
  self
end

#work_created!(work) ⇒ Object



88
89
90
# File 'lib/asir/thread_pool.rb', line 88

def work_created! work
  log! { "work_created! #{work.inspect}" }
end

#work_starting!(work) ⇒ Object



92
93
94
# File 'lib/asir/thread_pool.rb', line 92

def work_starting! work
  log! { "work_starting! #{work.inspect} #{work.worker.inspect}" }
end

#work_stopping!(work) ⇒ Object



96
97
98
# File 'lib/asir/thread_pool.rb', line 96

def work_stopping! work
  log! { "work_stopping! #{work.inspect}" }
end

#worker_created!(worker) ⇒ Object



76
77
78
# File 'lib/asir/thread_pool.rb', line 76

def worker_created! worker
  log! { "worker_created! #{worker.inspect}" }
end

#worker_starting!(worker) ⇒ Object



80
81
82
# File 'lib/asir/thread_pool.rb', line 80

def worker_starting! worker
  log! { "worker_starting! #{worker}" }
end

#worker_stopping!(worker) ⇒ Object



84
85
86
# File 'lib/asir/thread_pool.rb', line 84

def worker_stopping! worker
  log! { "worker_stopping! #{worker}" }
end