Class: SizedParallel::Pool

Inherits:
Object
  • Object
show all
Defined in:
lib/sized_parallel/pool.rb

Overview

Very naive thread pool; does everything I need and nothing more. It does not have any 'maximum' or 'minimum' thread count but just only one fixed number of threads, allocated at once, run in parallel, then die when no jobs are left. You can reuse a pool after you once did something on it, though.

p = Pool.new
1024.times do |i|
  p.process i do |j|
    printf "%d\n", j
    sleep 1
  end
end
p.wait
p.process ...

Instance Method Summary collapse

Constructor Details

#initialize(n = Etc.nprocessors) {|self| ... } ⇒ Pool

Allocates a new pool. If no block is given, just returns a new object. Otherwise, evaluates the given block with expection that the block registers some jobs to it, and wait for them to finish, i.e:

Pool.new do |p|
  ...
end

is a shorthand for:

p = Pool.new
begin
  ...
ensure
  p.wait
end

Parameters:

  • n (Integer) (defaults to: Etc.nprocessors)

    parallelizm.

Yield Parameters:

Raises:

  • (ArgumetError)

    argument does not make sense.



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/sized_parallel/pool.rb', line 64

def initialize n = Etc.nprocessors
  case n when Integer then
    raise ArgumentError, 'negative number makes no sense' if n <= 0
    @n = n
    @q = Queue.new
  else
    raise TypeError, 'not a number'
  end

  return unless defined? yield

  begin
    yield self
  ensure
    wait
  end
end

Instance Method Details

#process(*argv) {|*argv| ... } ⇒ self

Note:

what on earth is the arguments that seems completely useless? Well the problem we are routing is inter-thread variable scope. For instance it is a WRONG idea to write code like this:

1024.times { |i| Thread.start { puts i } } # WRONG WRONG WROG WRONG

The code above behaves unexpectedly because the variable i is shared across threads, overwritten on occasions. A valid usage is below:

1024.times { |i| Thread.start(i) {|j| puts j } }
#                            ^^^  ^^^      ^

Note the use of newly introduced block-parameter j. The same discussion goes exactly the same way to this method, because obviously this is a method that spawns (or reuses) a thread.

Registers what shall be done.

Parameters:

  • argv (...)

    passed to the block verbatimly.

Yield Parameters:

  • *argv (...)

    what was passed to the method.

Returns:

  • (self)


103
104
105
106
107
# File 'lib/sized_parallel/pool.rb', line 103

def process *argv
  argv.unshift Proc.new
  @q.enq argv
  return self
end

#waitself

This method blocks until all jobs registered to finish. Once after this method returns, the thread pool gets back to its initial state; ready to be used again.

Returns:

  • (self)


113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/sized_parallel/pool.rb', line 113

def wait
  Array.new @n do
    Thread.start do
      failed = false
      begin
        while a = @q.deq(true) do
          # rubocop:disable Style/EmptyElse, Lint/RescueException
          begin
            job, *argv = *a
            job.call(*argv)
          rescue Exception => e
            if Thread.abort_on_exception then
              # When a  job ends abnormally, we  are not sure what  to do for
              # it.  Can't but just  follow abort_on_exception global setting
              # for now.  Should there be a better way.
              failed = e
              break
            else
              # silently ignore
            end
          end
          # rubocop:enable Style/EmptyElse, Lint/RescueException
        end
        # Reaching here indicates the abnormal end-of-job. tricky!
      rescue ThreadError
        # This  ThreadError can  only  come from  @q.deq,  indicates no  more
        # job(s) to call.
        Thread.exit
      end
      raise failed if failed
    end
  end.each(&:join)
  return self
end