Class: TPX::Exec

Inherits:
Object
  • Object
show all
Defined in:
lib/tpx.rb

Instance Method Summary collapse

Constructor Details

#initialize(size, opts = {}) ⇒ Exec

Returns a new instance of Exec.



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/tpx.rb', line 22

def initialize(size, opts = {})
  tix = opts[:tix]

  @size = size
  @jobs = Queue.new

  @pool = Array.new(size) do
    Thread.new do
      Thread.current[:tix] = tix if tix
      catch(:exit) do
        loop do
          begin
            jid, job, args, acc = @jobs.pop
            acc << [jid, job.call(*args)]
          rescue => e
            acc << 'ERROR'
          end
        end
      end
    end
  end
end

Instance Method Details

#schedule(args, &block) ⇒ Object



45
46
47
48
49
# File 'lib/tpx.rb', line 45

def schedule(args, &block)
  acc = Queue.new
  @jobs << [0, block, args, acc]
  (acc.pop)[1]
end

#schedule_m_add(args, &block) ⇒ Object



55
56
57
58
59
60
# File 'lib/tpx.rb', line 55

def schedule_m_add(args, &block)
  acc = args[0]
  jid = args[1]
  args = args[2]
  @jobs << [jid, block, args, acc]
end

#schedule_m_buildObject



51
52
53
# File 'lib/tpx.rb', line 51

def schedule_m_build
  Queue.new
end

#schedule_m_read(acc, n) ⇒ Object



62
63
64
65
66
67
68
69
70
# File 'lib/tpx.rb', line 62

def schedule_m_read(acc, n)
  res = []

  while res.size < n do
    res << (acc.pop)
  end

  res
end

#shutdownObject



72
73
74
75
76
77
78
# File 'lib/tpx.rb', line 72

def shutdown
  @size.times do
    schedule { throw :exit }
  end

  @pool.map(&:join)
end