Class: SizedParallel::Pool
- Inherits:
-
Object
- Object
- SizedParallel::Pool
- Defined in:
- lib/sized_parallel/pool.rb
Overview
Very naive thread pool; does everything I need and nothing more. It does not have any 'maximum' or 'minimum' thread count but just only one fixed number of threads, allocated at once, run in parallel, then die when no jobs are left. You can reuse a pool after you once did something on it, though.
p = Pool.new
1024.times do |i|
p.process i do |j|
printf "%d\n", j
sleep 1
end
end
p.wait
p.process ...
Instance Method Summary collapse
-
#initialize(n = Etc.nprocessors) {|self| ... } ⇒ Pool
constructor
Allocates a new pool.
-
#process(*argv) {|*argv| ... } ⇒ self
Registers what shall be done.
-
#wait ⇒ self
This method blocks until all jobs registered to finish.
Constructor Details
#initialize(n = Etc.nprocessors) {|self| ... } ⇒ Pool
Allocates a new pool. If no block is given, just returns a new object. Otherwise, evaluates the given block with expection that the block registers some jobs to it, and wait for them to finish, i.e:
Pool.new do |p|
...
end
is a shorthand for:
p = Pool.new
begin
...
ensure
p.wait
end
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/sized_parallel/pool.rb', line 64 def initialize n = Etc.nprocessors case n when Integer then raise ArgumentError, 'negative number makes no sense' if n <= 0 @n = n @q = Queue.new else raise TypeError, 'not a number' end return unless defined? yield begin yield self ensure wait end end |
Instance Method Details
#process(*argv) {|*argv| ... } ⇒ self
what on earth is the arguments that seems completely useless? Well the problem we are routing is inter-thread variable scope. For instance it is a WRONG idea to write code like this:
1024.times { |i| Thread.start { puts i } } # WRONG WRONG WROG WRONG
The code above behaves unexpectedly because the variable i is shared
across threads, overwritten on occasions. A valid usage is below:
1024.times { |i| Thread.start(i) {|j| puts j } }
# ^^^ ^^^ ^
Note the use of newly introduced block-parameter j. The same
discussion goes exactly the same way to this method, because obviously
this is a method that spawns (or reuses) a thread.
Registers what shall be done.
103 104 105 106 107 |
# File 'lib/sized_parallel/pool.rb', line 103 def process *argv argv.unshift Proc.new @q.enq argv return self end |
#wait ⇒ self
This method blocks until all jobs registered to finish. Once after this method returns, the thread pool gets back to its initial state; ready to be used again.
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/sized_parallel/pool.rb', line 113 def wait Array.new @n do Thread.start do failed = false begin while a = @q.deq(true) do # rubocop:disable Style/EmptyElse, Lint/RescueException begin job, *argv = *a job.call(*argv) rescue Exception => e if Thread.abort_on_exception then # When a job ends abnormally, we are not sure what to do for # it. Can't but just follow abort_on_exception global setting # for now. Should there be a better way. failed = e break else # silently ignore end end # rubocop:enable Style/EmptyElse, Lint/RescueException end # Reaching here indicates the abnormal end-of-job. tricky! rescue ThreadError # This ThreadError can only come from @q.deq, indicates no more # job(s) to call. Thread.exit end raise failed if failed end end.each(&:join) return self end |