Class: Pwrake::Invoker
- Inherits:
-
Object
- Object
- Pwrake::Invoker
- Defined in:
- lib/pwrake/worker/invoker.rb
Instance Method Summary collapse
- #close_all ⇒ Object
- #command_loop ⇒ Object
- #common_line(line) ⇒ Object
- #get_line ⇒ Object
-
#initialize(dir_class, ncore, option) ⇒ Invoker
constructor
A new instance of Invoker.
- #kill_all(sig) ⇒ Object
-
#processor_count ⇒ Object
from Michael Grosser’s parallel github.com/grosser/parallel.
- #run ⇒ Object
- #setup_loop ⇒ Object
- #setup_option ⇒ Object
- #start_heartbeat ⇒ Object
Constructor Details
#initialize(dir_class, ncore, option) ⇒ Invoker
Returns a new instance of Invoker.
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/pwrake/worker/invoker.rb', line 5 def initialize(dir_class, ncore, option) @dir_class = dir_class @option = option @out = Writer.instance # firstly replace $stderr @log = LogExecutor.instance @log.init(@option) @log.open(@dir_class) @out.add_logger(@log) ncore_max = processor_count() if ncore.kind_of?(Integer) if ncore > 0 @ncore = ncore else @ncore = ncore_max + ncore end if @ncore <= 0 m = "Out of range: ncore=#{ncore.inspect}" @out.puts "ncore:"+m raise ArgumentError,m end elsif ncore.nil? @ncore = ncore_max else m = "Invalid argument: ncore=#{ncore.inspect}" @out.puts "ncore:"+m raise ArgumentError,m end @out.puts "ncore:#{@ncore}" # does NOT exit when writing to broken pipe Signal.trap("PIPE", "SIG_IGN") end |
Instance Method Details
#close_all ⇒ Object
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
# File 'lib/pwrake/worker/invoker.rb', line 144 def close_all @log.info "close_all" @heartbeat_thread.kill if @heartbeat_thread Dir.chdir id_list = Executor::LIST.keys ex_list = Executor::LIST.values ex_list.each{|ex| ex.close} begin ex_list.each{|ex| ex.join} rescue => e @log.error e @log.error e.backtrace.join("\n") end @log.info "worker:end:#{id_list.inspect}" begin Timeout.timeout(20){@log.close} rescue => e $stdout.puts e $stdout.puts e.backtrace.join("\n") end @out.puts "exited" end |
#command_loop ⇒ Object
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/pwrake/worker/invoker.rb', line 97 def command_loop while line = get_line case line when /^(\d+):(.*)$/o id,cmd = $1,$2 ex = Executor::LIST[id] if ex.nil? if cmd=="exit" @out.puts "#{id}:end" next else ex = Executor.new(@dir_class,id,@shell_cmd,@shell_rc) end end ex.execute(cmd) else break if common_line(line) end end end |
#common_line(line) ⇒ Object
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/pwrake/worker/invoker.rb', line 118 def common_line(line) case line when /^exit$/o return true # when /^kill:(.*)$/o kill_all($1) return false # when /^p$/o puts "Executor::LIST = #{Executor::LIST.inspect}" return false # else msg = "invalid line: #{line}" @log.fatal msg raise RuntimeError,msg end end |
#get_line ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/pwrake/worker/invoker.rb', line 37 def get_line begin line = $stdin.gets exit if !line line.chomp! line.strip! @log.info ">#{line}" return line rescue exit end end |
#kill_all(sig) ⇒ Object
138 139 140 141 142 |
# File 'lib/pwrake/worker/invoker.rb', line 138 def kill_all(sig) sig = sig.to_i if /^\d+$/o =~ sig @log.warn "worker_killed:signal=#{sig}" Executor::LIST.each{|id,exc| exc.kill(sig)} end |
#processor_count ⇒ Object
from Michael Grosser’s parallel github.com/grosser/parallel
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 |
# File 'lib/pwrake/worker/invoker.rb', line 169 def processor_count host_os = RbConfig::CONFIG['host_os'] case host_os when /linux|cygwin/ ncpu = 0 open("/proc/cpuinfo").each do |l| ncpu += 1 if /^processor\s+: \d+/=~l end ncpu when /darwin9/ `hwprefs cpu_count`.to_i when /darwin/ (hwprefs_available? ? `hwprefs thread_count` : `sysctl -n hw.ncpu`).to_i when /(open|free)bsd/ `sysctl -n hw.ncpu`.to_i when /mswin|mingw/ require 'win32ole' wmi = WIN32OLE.connect("winmgmts://") cpu = wmi.ExecQuery("select NumberOfLogicalProcessors from Win32_Processor") cpu.to_enum.first.NumberOfLogicalProcessors when /solaris2/ `psrinfo -p`.to_i # physical cpus else raise "Unknown architecture: #{host_os}" end end |
#run ⇒ Object
50 51 52 53 54 55 56 57 58 |
# File 'lib/pwrake/worker/invoker.rb', line 50 def run setup_option if setup_loop start_heartbeat command_loop end ensure close_all end |
#setup_loop ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/pwrake/worker/invoker.rb', line 70 def setup_loop while line = get_line case line when /^(\d+):open$/o $1.split.each do |id| Executor.new(@dir_class,id,@shell_cmd,@shell_rc) end when "setup_end" return true else return false if common_line(line) end end false end |
#setup_option ⇒ Object
60 61 62 63 64 65 66 67 68 |
# File 'lib/pwrake/worker/invoker.rb', line 60 def setup_option @log.info @option.inspect @heartbeat_interval = @option[:heartbeat] @shell_cmd = @option[:shell_command] @shell_rc = @option[:shell_rc] || [] (@option[:pass_env]||{}).each do |k,v| ENV[k] = v end end |
#start_heartbeat ⇒ Object
86 87 88 89 90 91 92 93 94 95 |
# File 'lib/pwrake/worker/invoker.rb', line 86 def start_heartbeat if @heartbeat_interval @heartbeat_thread = Thread.new do while true @out.puts "heartbeat" sleep @heartbeat_interval end end end end |