Class: PCOWS
- Inherits:
-
Object
- Object
- PCOWS
- Defined in:
- lib/bio-vcf/pcows.rb
Constant Summary collapse
- RUNNINGEXT =
'part'
Instance Method Summary collapse
-
#initialize(num_threads, name = File.basename(__FILE__), timeout = 180) ⇒ PCOWS
constructor
A new instance of PCOWS.
-
#process_output(func = nil, type = :by_line, blocking = false) ⇒ Object
—- In this section the output gets collected and passed on to a printer thread.
- #process_remaining_output ⇒ Object
-
#submit_worker(func, state) ⇒ Object
Feed the worker func and state to COWS.
- #wait_for_worker(info) ⇒ Object
-
#wait_for_worker_slot ⇒ Object
Make sure no more than num_threads are running at the same time - this is achieved by checking the PID table and the running files in the tmpdir.
-
#wait_for_workers ⇒ Object
This is the final cleanup after the reader thread is done.
Constructor Details
#initialize(num_threads, name = File.basename(__FILE__), timeout = 180) ⇒ PCOWS
Returns a new instance of PCOWS.
9 10 11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/bio-vcf/pcows.rb', line 9 def initialize(num_threads,name=File.basename(__FILE__),timeout=180) num_threads = cpu_count() if not num_threads # FIXME: set to cpu_num by default # $stderr.print "Using ",num_threads,"threads \n" @num_threads = num_threads @pid_list = [] @name = name @timeout = timeout if multi_threaded @tmpdir = Dir::mktmpdir(@name+'_') end @last_output = 0 # counter @output_locked = nil end |
Instance Method Details
#process_output(func = nil, type = :by_line, blocking = false) ⇒ Object
—- In this section the output gets collected and passed on to a
printer thread. This function makes sure the printing is
ordered and that no printers are running at the same
time. The printer thread should be doing as little processing
as possible.
In this implementation type==:by_line will call func for
each line. Otherwise it is called once with the filename.
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/bio-vcf/pcows.rb', line 85 def process_output(func=nil,type = :by_line, blocking=false) return if single_threaded output = lambda { |fn| if type == :by_line File.new(fn).each_line { |buf| print buf } else func.call(fn) end File.unlink(fn) } if @output_locked (pid,count,fn) = @output_locked return if File.exist?(fn) # still processing # on to the next one @last_output += 1 @output_locked = nil end if info = @pid_list[@last_output] (pid,count,fn) = info $stderr.print "Processing #{fn}\n" if File.exist?(fn) # Yes! We have the next output, create outputter if not blocking pid = fork do output.call(fn) exit(0) end @output_locked = info else output.call(fn) end end end end |
#process_remaining_output ⇒ Object
156 157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/bio-vcf/pcows.rb', line 156 def process_remaining_output() return if single_threaded while @output_locked sleep 0.2 process_output() end @pid_list.each do |info| process_output(nil,:by_line,true) end # final cleanup Dir.unlink(@tmpdir) if @tmpdir end |
#submit_worker(func, state) ⇒ Object
Feed the worker func and state to COWS. Note that func is a lambda closure so it can pick up surrounding scope at invocation in addition to the data captured in ‘state’.
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/bio-vcf/pcows.rb', line 27 def submit_worker(func,state) pid = nil if multi_threaded count = @pid_list.size+1 fn = mktmpfilename(count) pid = fork do # ---- This is running a new copy-on-write process tempfn = fn+'.'+RUNNINGEXT STDOUT.reopen(File.open(tempfn, 'w+')) func.call(state).each { | line | print line } STDOUT.flush STDOUT.close FileUtils::mv(tempfn,fn) exit 0 end else # ---- Call in main process and output immediately func.call(state).each { | line | print line } end @pid_list << [ pid,count,fn ] return true end |
#wait_for_worker(info) ⇒ Object
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/bio-vcf/pcows.rb', line 122 def wait_for_worker(info) (pid,count,fn) = info if pid_or_file_running?(pid,fn) $stderr.print "Waiting up to #{@timeout} seconds for pid=#{pid} to complete\n" begin Timeout.timeout(@timeout) do while not File.exist?(fn) # wait for the result to appear sleep 0.2 end end # Thread file should have gone: raise "FATAL: child process appears to have crashed #{fn}" if not File.exist?(fn) $stderr.print "OK pid=#{pid}, processing #{fn}\n" rescue Timeout::Error if pid_running?(pid) # Kill it to speed up exit Process.kill 9, pid Process.wait pid end $stderr.print "FATAL: child process killed because it stopped responding, pid = #{pid}\n" end end end |
#wait_for_worker_slot ⇒ Object
Make sure no more than num_threads are running at the same time - this is achieved by checking the PID table and the running files in the tmpdir
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/bio-vcf/pcows.rb', line 54 def wait_for_worker_slot() return if single_threaded Timeout.timeout(@timeout) do while true # ---- count running pids running = @pid_list.reduce(0) do | sum, info | (pid,count,fn) = info if pid_or_file_running?(pid,fn) sum+1 else sum end end return if running < @num_threads $stderr.print "Waiting for slot (timeout=#{@timeout})\n" sleep 0.1 end end end |
#wait_for_workers ⇒ Object
This is the final cleanup after the reader thread is done. All workers need to complete.
149 150 151 152 153 154 |
# File 'lib/bio-vcf/pcows.rb', line 149 def wait_for_workers() return if single_threaded @pid_list.each do |info| wait_for_worker(info) end end |