Class: TPX

Inherits:
Object
  • Object
show all
Defined in:
lib/tpx.rb

Instance Method Summary collapse

Constructor Details

#initialize(size, opts = {}) ⇒ TPX

Returns a new instance of TPX.



3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/tpx.rb', line 3

def initialize(size, opts = {})
  tix = opts[:tix]

  @size = size
  @jobs = Queue.new

  @pool = Array.new(size) do
    Thread.new do
      Thread.current[:tix] = tix if tix
      catch(:exit) do
        loop do
          begin
            jix, job, args, out = @jobs.pop
            out << [jix, job.call(*args)]
          rescue => e
            out << 'ERROR'
          end
        end
      end
    end
  end
end

Instance Method Details

#schedule(args, &block) ⇒ Object



26
27
28
29
30
# File 'lib/tpx.rb', line 26

def schedule(args, &block)
  out = Queue.new
  @jobs << [0, block, args, out]
  (out.pop)[1]
end

#schedule_m_add(args, &block) ⇒ Object



36
37
38
39
40
41
# File 'lib/tpx.rb', line 36

def schedule_m_add(args, &block)
  out = args[0]
  jix = args[1]
  args = args[2]
  @jobs << [jix, block, args, out]
end

#schedule_m_buildObject



32
33
34
# File 'lib/tpx.rb', line 32

def schedule_m_build
  Queue.new
end

#schedule_m_read(out, n) ⇒ Object



43
44
45
46
47
48
49
50
51
# File 'lib/tpx.rb', line 43

def schedule_m_read(out, n)
  res = []

  while res.size < n do
    res << (out.pop)
  end

  res.sort{ |x| x[0] }
end

#shutdownObject



53
54
55
56
57
58
59
# File 'lib/tpx.rb', line 53

def shutdown
  @size.times do
    schedule { throw :exit }
  end

  @pool.map(&:join)
end