Class: ASIR::ThreadPool
- Inherits:
-
Object
- Object
- ASIR::ThreadPool
- Includes:
- AdditionalData, Initialization
- Defined in:
- lib/asir/thread_pool.rb
Defined Under Namespace
Instance Attribute Summary collapse
-
#n_workers ⇒ Object
Returns the value of attribute n_workers.
-
#run ⇒ Object
Returns the value of attribute run.
-
#thread_class ⇒ Object
Returns the value of attribute thread_class.
-
#verbose ⇒ Object
Returns the value of attribute verbose.
-
#work_queue ⇒ Object
Returns the value of attribute work_queue.
-
#workers ⇒ Object
Returns the value of attribute workers.
Instance Method Summary collapse
-
#initialize(*args) ⇒ ThreadPool
constructor
A new instance of ThreadPool.
- #join(*args) ⇒ Object
- #kill!(*args) ⇒ Object
- #log!(msg = nil) ⇒ Object
-
#new(&blk) ⇒ Object
Returns a new Work object.
-
#start_workers! ⇒ Object
Keep a list of workers busy.
- #stop! ⇒ Object
- #work_created!(work) ⇒ Object
- #work_starting!(work) ⇒ Object
- #work_stopping!(work) ⇒ Object
- #worker_created!(worker) ⇒ Object
- #worker_starting!(worker) ⇒ Object
- #worker_stopping!(worker) ⇒ Object
Methods included from AdditionalData
#[], #[]=, #_additional_data, #additional_data, #additional_data!, #additional_data=, included
Constructor Details
#initialize(*args) ⇒ ThreadPool
Returns a new instance of ThreadPool.
14 15 16 17 18 19 20 21 22 23 24 |
# File 'lib/asir/thread_pool.rb', line 14 def initialize *args super @thread_class ||= ::Thread @workers_mutex = Mutex.new @work_mutex = Mutex.new @workers ||= [ ] @work_queue ||= Queue.new @run = false @work_id = @worker_id = 0 @time_0 ||= Time.now end |
Instance Attribute Details
#n_workers ⇒ Object
Returns the value of attribute n_workers.
9 10 11 |
# File 'lib/asir/thread_pool.rb', line 9 def n_workers @n_workers end |
#run ⇒ Object
Returns the value of attribute run.
12 13 14 |
# File 'lib/asir/thread_pool.rb', line 12 def run @run end |
#thread_class ⇒ Object
Returns the value of attribute thread_class.
9 10 11 |
# File 'lib/asir/thread_pool.rb', line 9 def thread_class @thread_class end |
#verbose ⇒ Object
Returns the value of attribute verbose.
11 12 13 |
# File 'lib/asir/thread_pool.rb', line 11 def verbose @verbose end |
#work_queue ⇒ Object
Returns the value of attribute work_queue.
10 11 12 |
# File 'lib/asir/thread_pool.rb', line 10 def work_queue @work_queue end |
#workers ⇒ Object
Returns the value of attribute workers.
9 10 11 |
# File 'lib/asir/thread_pool.rb', line 9 def workers @workers end |
Instance Method Details
#join(*args) ⇒ Object
133 134 135 136 137 138 139 |
# File 'lib/asir/thread_pool.rb', line 133 def join *args until @workers.empty? @workers.each do | worker | worker.join(*args) end end end |
#kill!(*args) ⇒ Object
122 123 124 125 126 127 128 129 130 131 |
# File 'lib/asir/thread_pool.rb', line 122 def kill! *args log! "kill!" @run = false @workers_mutex.synchronize do @workers.each do | worker | worker.kill! *args end end self end |
#log!(msg = nil) ⇒ Object
100 101 102 103 104 105 106 |
# File 'lib/asir/thread_pool.rb', line 100 def log! msg = nil return unless @verbose msg ||= yield @time_1 = Time.now $stderr.puts " #{@time_1 - @time_0} #{$$} #{Thread.current.object_id} #{self} #{msg}" self end |
#new(&blk) ⇒ Object
Returns a new Work object.
27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/asir/thread_pool.rb', line 27 def new &blk work_id = @work_mutex.synchronize do @work_id += 1 end work = Work.new(:block => blk, :work_id => work_id) work_created! work @work_queue.enq(work) @run = true start_workers! if @auto_start_workers work end |
#start_workers! ⇒ Object
Keep a list of workers busy.
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/asir/thread_pool.rb', line 40 def start_workers! return nil unless @run workers_size = @workers.size want_n = [ n_workers, @work_queue.size ].min want_n = n_workers if want_n > n_workers start_n = want_n - workers_size start_n = 0 if start_n < 0 return unless start_n > 0 log! { "start_workers! #{start_n}" } start_n.times do thread_class.new do worker_id = @workers_mutex.synchronize do @worker_id += 1 end worker = Worker.new(:thread_pool => self, :worker_id => worker_id) worker_created! worker begin worker_starting! worker @workers_mutex.synchronize do @workers << worker end worker.run! # rescue ::Exception => exc # log! "ERROR: #{exc.inspect}\n#{exc.backtrace * "\n"}" # raise exc ensure @workers_mutex.synchronize do @workers.delete(worker) end worker_stopping! worker end end end self end |
#stop! ⇒ Object
108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/asir/thread_pool.rb', line 108 def stop! log! "stop!" @run = false # Ask each current worker to :stop! @workers_mutex.synchronize do @workers.each do | w | @work_queue.enq :stop! end end # Just incase. @work_queue.enq :stop! self end |
#work_created!(work) ⇒ Object
88 89 90 |
# File 'lib/asir/thread_pool.rb', line 88 def work_created! work log! { "work_created! #{work.inspect}" } end |
#work_starting!(work) ⇒ Object
92 93 94 |
# File 'lib/asir/thread_pool.rb', line 92 def work_starting! work log! { "work_starting! #{work.inspect} #{work.worker.inspect}" } end |
#work_stopping!(work) ⇒ Object
96 97 98 |
# File 'lib/asir/thread_pool.rb', line 96 def work_stopping! work log! { "work_stopping! #{work.inspect}" } end |
#worker_created!(worker) ⇒ Object
76 77 78 |
# File 'lib/asir/thread_pool.rb', line 76 def worker_created! worker log! { "worker_created! #{worker.inspect}" } end |
#worker_starting!(worker) ⇒ Object
80 81 82 |
# File 'lib/asir/thread_pool.rb', line 80 def worker_starting! worker log! { "worker_starting! #{worker}" } end |
#worker_stopping!(worker) ⇒ Object
84 85 86 |
# File 'lib/asir/thread_pool.rb', line 84 def worker_stopping! worker log! { "worker_stopping! #{worker}" } end |