Class: Pwrake::Invoker
- Inherits:
-
Object
- Object
- Pwrake::Invoker
- Includes:
- Parallel::ProcessorCount
- Defined in:
- lib/pwrake/worker/invoker.rb
Instance Method Summary collapse
- #close_all ⇒ Object
- #command_loop ⇒ Object
- #common_line(line) ⇒ Object
- #get_line ⇒ Object
-
#initialize(dir_class, ncore, option) ⇒ Invoker
constructor
A new instance of Invoker.
- #kill_all(sig) ⇒ Object
- #run ⇒ Object
- #setup_loop ⇒ Object
- #setup_option ⇒ Object
- #start_heartbeat ⇒ Object
Constructor Details
#initialize(dir_class, ncore, option) ⇒ Invoker
Returns a new instance of Invoker.
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/pwrake/worker/invoker.rb', line 8 def initialize(dir_class, ncore, option) @dir_class = dir_class @option = option @out = Writer.instance # firstly replace $stderr @log = LogExecutor.instance @log.init(@option) @log.open(@dir_class) @out.add_logger(@log) ncore_max = processor_count() if ncore.kind_of?(Integer) if ncore > 0 @ncore = ncore else @ncore = ncore_max + ncore end if @ncore <= 0 m = "Out of range: ncore=#{ncore.inspect}" @out.puts "ncore:"+m raise ArgumentError,m end elsif ncore.nil? @ncore = ncore_max else m = "Invalid argument: ncore=#{ncore.inspect}" @out.puts "ncore:"+m raise ArgumentError,m end @out.puts "ncore:#{@ncore}" # does NOT exit when writing to broken pipe Signal.trap("PIPE", "SIG_IGN") end |
Instance Method Details
#close_all ⇒ Object
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/pwrake/worker/invoker.rb', line 147 def close_all @log.info "close_all" @heartbeat_thread.kill if @heartbeat_thread Dir.chdir id_list = Executor::LIST.keys ex_list = Executor::LIST.values ex_list.each{|ex| ex.close} begin ex_list.each{|ex| ex.join} rescue => e @log.error e @log.error e.backtrace.join("\n") end @log.info "worker:end:#{id_list.inspect}" begin Timeout.timeout(20){@log.close} rescue => e $stdout.puts e $stdout.puts e.backtrace.join("\n") end ensure @out.puts "exited" end |
#command_loop ⇒ Object
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/pwrake/worker/invoker.rb', line 100 def command_loop while line = get_line case line when /^(\d+):(.*)$/o id,cmd = $1,$2 ex = Executor::LIST[id] if ex.nil? if cmd=="exit" @out.puts "#{id}:end" next else ex = Executor.new(@dir_class,id,@shell_cmd,@shell_rc) end end ex.execute(cmd) else break if common_line(line) end end end |
#common_line(line) ⇒ Object
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/pwrake/worker/invoker.rb', line 121 def common_line(line) case line when /^exit$/o return true # when /^kill:(.*)$/o kill_all($1) return false # when /^p$/o puts "Executor::LIST = #{Executor::LIST.inspect}" return false # else msg = "invalid line: #{line}" @log.fatal msg raise RuntimeError,msg end end |
#get_line ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/pwrake/worker/invoker.rb', line 40 def get_line begin line = $stdin.gets exit if !line line.chomp! line.strip! @log.info ">#{line}" return line rescue exit end end |
#kill_all(sig) ⇒ Object
141 142 143 144 145 |
# File 'lib/pwrake/worker/invoker.rb', line 141 def kill_all(sig) sig = sig.to_i if /^\d+$/o =~ sig @log.warn "worker_killed:signal=#{sig}" Executor::LIST.each{|id,exc| exc.kill(sig)} end |
#run ⇒ Object
53 54 55 56 57 58 59 60 61 |
# File 'lib/pwrake/worker/invoker.rb', line 53 def run setup_option if setup_loop start_heartbeat command_loop end ensure close_all end |
#setup_loop ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/pwrake/worker/invoker.rb', line 73 def setup_loop while line = get_line case line when /^(\d+):open$/o $1.split.each do |id| Executor.new(@dir_class,id,@shell_cmd,@shell_rc) end when "setup_end" return true else return false if common_line(line) end end false end |
#setup_option ⇒ Object
63 64 65 66 67 68 69 70 71 |
# File 'lib/pwrake/worker/invoker.rb', line 63 def setup_option @log.info @option.inspect @heartbeat_interval = @option[:heartbeat] @shell_cmd = @option[:shell_command] @shell_rc = @option[:shell_rc] || [] (@option[:pass_env]||{}).each do |k,v| ENV[k] = v end end |
#start_heartbeat ⇒ Object
89 90 91 92 93 94 95 96 97 98 |
# File 'lib/pwrake/worker/invoker.rb', line 89 def start_heartbeat if @heartbeat_interval @heartbeat_thread = Thread.new do while true @out.puts "heartbeat" sleep @heartbeat_interval end end end end |