Class: PCOWS

Inherits:
Object
  • Object
show all
Defined in:
lib/bio-vcf/pcows.rb

Constant Summary collapse

RUNNINGEXT =
'part'

Instance Method Summary collapse

Constructor Details

#initialize(num_threads, name = File.basename(__FILE__), timeout = 180) ⇒ PCOWS

Returns a new instance of PCOWS.



9
10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/bio-vcf/pcows.rb', line 9

def initialize(num_threads,name=File.basename(__FILE__),timeout=180)
  num_threads = cpu_count() if not num_threads # FIXME: set to cpu_num by default
  # $stderr.print "Using ",num_threads,"threads \n"
  @num_threads = num_threads
  @pid_list = []
  @name = name
  @timeout = timeout
  if multi_threaded
    @tmpdir =  Dir::mktmpdir(@name+'_')
  end
  @last_output = 0 # counter
  @output_locked = nil
end

Instance Method Details

#process_output(func = nil, type = :by_line, blocking = false) ⇒ Object

—- In this section the output gets collected and passed on to a

printer thread. This function makes sure the printing is
ordered and that no printers are running at the same
time. The printer thread should be doing as little processing
as possible.

In this implementation type==:by_line will call func for
each line. Otherwise it is called once with the filename.


85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/bio-vcf/pcows.rb', line 85

def process_output(func=nil,type = :by_line, blocking=false)
  return if single_threaded
  output = lambda { |fn|
    if type == :by_line
      File.new(fn).each_line { |buf|
        print buf
      }
    else
      func.call(fn)
    end
    File.unlink(fn)
  }
  if @output_locked
    (pid,count,fn) = @output_locked
    return if File.exist?(fn)  # still processing
    # on to the next one
    @last_output += 1
    @output_locked = nil
  end
  if info = @pid_list[@last_output]
    (pid,count,fn) = info
    $stderr.print "Processing #{fn}\n"
    if File.exist?(fn)
      # Yes! We have the next output, create outputter
      if not blocking
        pid = fork do
          output.call(fn)
          exit(0)
        end
        @output_locked = info
      else
        output.call(fn)
      end
    end
  end
end

#process_remaining_outputObject



156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/bio-vcf/pcows.rb', line 156

def process_remaining_output()
  return if single_threaded
  while @output_locked
    sleep 0.2
    process_output()
  end
  @pid_list.each do |info|
    process_output(nil,:by_line,true)
  end
  # final cleanup
  Dir.unlink(@tmpdir) if @tmpdir
end

#submit_worker(func, state) ⇒ Object

Feed the worker func and state to COWS. Note that func is a lambda closure so it can pick up surrounding scope at invocation in addition to the data captured in ‘state’.



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/bio-vcf/pcows.rb', line 27

def submit_worker(func,state)
  pid = nil
  if multi_threaded
    count = @pid_list.size+1
    fn = mktmpfilename(count)
    pid = fork do
      # ---- This is running a new copy-on-write process
      tempfn = fn+'.'+RUNNINGEXT
      STDOUT.reopen(File.open(tempfn, 'w+'))
      func.call(state).each { | line | print line }
      STDOUT.flush
      STDOUT.close
      FileUtils::mv(tempfn,fn)
      exit 0
    end
  else
    # ---- Call in main process and output immediately
    func.call(state).each { | line | print line }
  end
  @pid_list << [ pid,count,fn ]
  return true
end

#wait_for_worker(info) ⇒ Object



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/bio-vcf/pcows.rb', line 122

def wait_for_worker(info)
  (pid,count,fn) = info
  if pid_or_file_running?(pid,fn)
    $stderr.print "Waiting up to #{@timeout} seconds for pid=#{pid} to complete\n"
    begin
      Timeout.timeout(@timeout) do
        while not File.exist?(fn)  # wait for the result to appear
          sleep 0.2
        end
      end
      # Thread file should have gone:
      raise "FATAL: child process appears to have crashed #{fn}" if not File.exist?(fn)
      $stderr.print "OK pid=#{pid}, processing #{fn}\n"
    rescue Timeout::Error
      if pid_running?(pid)
        # Kill it to speed up exit
        Process.kill 9, pid
        Process.wait pid
      end
      $stderr.print "FATAL: child process killed because it stopped responding, pid = #{pid}\n"
    end
  end
end

#wait_for_worker_slotObject

Make sure no more than num_threads are running at the same time - this is achieved by checking the PID table and the running files in the tmpdir



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/bio-vcf/pcows.rb', line 54

def wait_for_worker_slot()
  return if single_threaded
  Timeout.timeout(@timeout) do
  
    while true
      # ---- count running pids
      running = @pid_list.reduce(0) do | sum, info |
        (pid,count,fn) = info
        if pid_or_file_running?(pid,fn)
          sum+1
        else
          sum
        end
      end
      return if running < @num_threads
      $stderr.print "Waiting for slot (timeout=#{@timeout})\n"
      sleep 0.1
      
    end
  end
end

#wait_for_workersObject

This is the final cleanup after the reader thread is done. All workers need to complete.



149
150
151
152
153
154
# File 'lib/bio-vcf/pcows.rb', line 149

def wait_for_workers()
  return if single_threaded
  @pid_list.each do |info|
    wait_for_worker(info)
  end
end