Class: ASIR::ThreadPool
- Inherits:
-
Object
- Object
- ASIR::ThreadPool
- Includes:
- AdditionalData, Initialization
- Defined in:
- lib/asir/thread_pool.rb
Defined Under Namespace
Instance Attribute Summary collapse
-
#auto_start_workers ⇒ Object
Returns the value of attribute auto_start_workers.
-
#n_workers ⇒ Object
Returns the value of attribute n_workers.
-
#run ⇒ Object
Returns the value of attribute run.
-
#thread_class ⇒ Object
Returns the value of attribute thread_class.
-
#verbose ⇒ Object
Returns the value of attribute verbose.
-
#work_queue ⇒ Object
Returns the value of attribute work_queue.
-
#workers ⇒ Object
Returns the value of attribute workers.
Instance Method Summary collapse
-
#initialize(*args) ⇒ ThreadPool
constructor
A new instance of ThreadPool.
- #join(*args) ⇒ Object
- #kill!(*args) ⇒ Object
- #log!(msg = nil) ⇒ Object
-
#new(&blk) ⇒ Object
Returns a new Work object.
-
#start_workers! ⇒ Object
Keep a list of workers busy.
- #stop! ⇒ Object
- #work_created!(work) ⇒ Object
- #work_starting!(work) ⇒ Object
- #work_stopping!(work) ⇒ Object
- #worker_created!(worker) ⇒ Object
- #worker_starting!(worker) ⇒ Object
- #worker_stopping!(worker) ⇒ Object
Methods included from AdditionalData
#[], #[]=, #_additional_data, #additional_data, #additional_data!, #additional_data=, included
Constructor Details
#initialize(*args) ⇒ ThreadPool
Returns a new instance of ThreadPool.
15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/asir/thread_pool.rb', line 15 def initialize *args super @thread_class ||= ::Thread @workers_mutex = Mutex.new @work_mutex = Mutex.new @workers ||= [ ] @work_queue ||= Queue.new @run = false @work_id = @worker_id = 0 @time_0 ||= Time.now end |
Instance Attribute Details
#auto_start_workers ⇒ Object
Returns the value of attribute auto_start_workers.
10 11 12 |
# File 'lib/asir/thread_pool.rb', line 10 def auto_start_workers @auto_start_workers end |
#n_workers ⇒ Object
Returns the value of attribute n_workers.
9 10 11 |
# File 'lib/asir/thread_pool.rb', line 9 def n_workers @n_workers end |
#run ⇒ Object
Returns the value of attribute run.
13 14 15 |
# File 'lib/asir/thread_pool.rb', line 13 def run @run end |
#thread_class ⇒ Object
Returns the value of attribute thread_class.
9 10 11 |
# File 'lib/asir/thread_pool.rb', line 9 def thread_class @thread_class end |
#verbose ⇒ Object
Returns the value of attribute verbose.
12 13 14 |
# File 'lib/asir/thread_pool.rb', line 12 def verbose @verbose end |
#work_queue ⇒ Object
Returns the value of attribute work_queue.
11 12 13 |
# File 'lib/asir/thread_pool.rb', line 11 def work_queue @work_queue end |
#workers ⇒ Object
Returns the value of attribute workers.
9 10 11 |
# File 'lib/asir/thread_pool.rb', line 9 def workers @workers end |
Instance Method Details
#join(*args) ⇒ Object
134 135 136 137 138 139 140 |
# File 'lib/asir/thread_pool.rb', line 134 def join *args until @workers.empty? @workers.each do | worker | worker && worker.join(*args) end end end |
#kill!(*args) ⇒ Object
123 124 125 126 127 128 129 130 131 132 |
# File 'lib/asir/thread_pool.rb', line 123 def kill! *args log! "kill!" @run = false @workers_mutex.synchronize do @workers.each do | worker | worker.kill! *args end end self end |
#log!(msg = nil) ⇒ Object
101 102 103 104 105 106 107 |
# File 'lib/asir/thread_pool.rb', line 101 def log! msg = nil return unless @verbose msg ||= yield @time_1 = Time.now $stderr.puts " #{@time_1 - @time_0} #{$$} #{Thread.current.object_id} #{self} #{msg}" self end |
#new(&blk) ⇒ Object
Returns a new Work object.
28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/asir/thread_pool.rb', line 28 def new &blk work_id = @work_mutex.synchronize do @work_id += 1 end work = Work.new(:block => blk, :work_id => work_id) work_created! work @work_queue.enq(work) @run = true start_workers! if @auto_start_workers work end |
#start_workers! ⇒ Object
Keep a list of workers busy.
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/asir/thread_pool.rb', line 41 def start_workers! return nil unless @run workers_size = @workers.size want_n = [ n_workers, @work_queue.size ].min want_n = n_workers if want_n > n_workers start_n = want_n - workers_size start_n = 0 if start_n < 0 return unless start_n > 0 log! { "start_workers! #{start_n}" } start_n.times do thread_class.new do worker_id = @workers_mutex.synchronize do @worker_id += 1 end worker = Worker.new(:thread_pool => self, :worker_id => worker_id) worker_created! worker begin worker_starting! worker @workers_mutex.synchronize do @workers << worker end worker.run! # rescue ::Exception => exc # log! "ERROR: #{exc.inspect}\n#{exc.backtrace * "\n"}" # raise exc ensure @workers_mutex.synchronize do @workers.delete(worker) end worker_stopping! worker end end end self end |
#stop! ⇒ Object
109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/asir/thread_pool.rb', line 109 def stop! log! "stop!" @run = false # Ask each current worker to :stop! @workers_mutex.synchronize do @workers.each do | w | @work_queue.enq :stop! end end # Just incase. @work_queue.enq :stop! self end |
#work_created!(work) ⇒ Object
89 90 91 |
# File 'lib/asir/thread_pool.rb', line 89 def work_created! work log! { "work_created! #{work.inspect}" } end |
#work_starting!(work) ⇒ Object
93 94 95 |
# File 'lib/asir/thread_pool.rb', line 93 def work_starting! work log! { "work_starting! #{work.inspect} #{work.worker.inspect}" } end |
#work_stopping!(work) ⇒ Object
97 98 99 |
# File 'lib/asir/thread_pool.rb', line 97 def work_stopping! work log! { "work_stopping! #{work.inspect}" } end |
#worker_created!(worker) ⇒ Object
77 78 79 |
# File 'lib/asir/thread_pool.rb', line 77 def worker_created! worker log! { "worker_created! #{worker.inspect}" } end |
#worker_starting!(worker) ⇒ Object
81 82 83 |
# File 'lib/asir/thread_pool.rb', line 81 def worker_starting! worker log! { "worker_starting! #{worker}" } end |
#worker_stopping!(worker) ⇒ Object
85 86 87 |
# File 'lib/asir/thread_pool.rb', line 85 def worker_stopping! worker log! { "worker_stopping! #{worker}" } end |