Class: TPX::Exec

Inherits:
Object
  • Object
show all
Defined in:
lib/tpx.rb

Instance Method Summary collapse

Constructor Details

#initialize(size, opts = {}) ⇒ Exec

Returns a new instance of Exec.



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/tpx.rb', line 21

def initialize(size, opts = {})
  tix = opts[:tix]

  @size = size
  @jobs = Queue.new

  @pool = Array.new(size) do
    Thread.new do
      Thread.current[:tix] = tix if tix
      catch(:exit) do
        loop do
          begin
            jix, job, args, acc = @jobs.pop
            acc << [jix, job.call(*args)]
          rescue => e
            acc << 'ERROR'
          end
        end
      end
    end
  end
end

Instance Method Details

#schedule(args, &block) ⇒ Object



44
45
46
47
48
# File 'lib/tpx.rb', line 44

def schedule(args, &block)
  acc = Queue.new
  @jobs << [0, block, args, acc]
  (acc.pop)[1]
end

#schedule_m_add(args, &block) ⇒ Object



54
55
56
57
58
59
# File 'lib/tpx.rb', line 54

def schedule_m_add(args, &block)
  acc = args[0]
  jix = args[1]
  args = args[2]
  @jobs << [jix, block, args, acc]
end

#schedule_m_buildObject



50
51
52
# File 'lib/tpx.rb', line 50

def schedule_m_build
  Queue.new
end

#schedule_m_read(acc, n) ⇒ Object



61
62
63
64
65
66
67
68
69
# File 'lib/tpx.rb', line 61

def schedule_m_read(acc, n)
  res = []

  while res.size < n do
    res << (acc.pop)
  end

  res
end

#shutdownObject



71
72
73
74
75
76
77
# File 'lib/tpx.rb', line 71

def shutdown
  @size.times do
    schedule { throw :exit }
  end

  @pool.map(&:join)
end