Class: Pwrake::Invoker

Inherits:
Object
  • Object
show all
Defined in:
lib/pwrake/worker/invoker.rb

Instance Method Summary collapse

Constructor Details

#initialize(dir_class, ncore, option) ⇒ Invoker

Returns a new instance of Invoker.



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/pwrake/worker/invoker.rb', line 5

def initialize(dir_class, ncore, option)
  @dir_class = dir_class
  @option = option
  @out = Writer.instance # firstly replace $stderr
  @log = LogExecutor.instance
  @log.init(@option)
  @log.open(@dir_class)
  @out.add_logger(@log)
  ncore_max = processor_count()
  if ncore.kind_of?(Integer)
    if ncore > 0
      @ncore = ncore
    else
      @ncore = ncore_max + ncore
    end
    if @ncore <= 0
      m = "Out of range: ncore=#{ncore.inspect}"
      @out.puts "ncore:"+m
      raise ArgumentError,m
    end
  elsif ncore.nil?
    @ncore = ncore_max
  else
    m = "Invalid argument: ncore=#{ncore.inspect}"
    @out.puts "ncore:"+m
    raise ArgumentError,m
  end
  @out.puts "ncore:#{@ncore}"
  # does NOT exit when writing to broken pipe
  Signal.trap("PIPE", "SIG_IGN")
end

Instance Method Details

#close_allObject



144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/pwrake/worker/invoker.rb', line 144

def close_all
  @log.info "close_all"
  @heartbeat_thread.kill if @heartbeat_thread
  Dir.chdir
  id_list = Executor::LIST.keys
  ex_list = Executor::LIST.values
  ex_list.each{|ex| ex.close}
  begin
    ex_list.each{|ex| ex.join}
  rescue => e
    @log.error e
    @log.error e.backtrace.join("\n")
  end
  @log.info "worker:end:#{id_list.inspect}"
  begin
    Timeout.timeout(20){@log.close}
  rescue => e
    $stdout.puts e
    $stdout.puts e.backtrace.join("\n")
  end
  @out.puts "exited"
end

#command_loopObject



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/pwrake/worker/invoker.rb', line 97

def command_loop
  while line = get_line
    case line
    when /^(\d+):(.*)$/o
      id,cmd = $1,$2
      ex = Executor::LIST[id]
      if ex.nil?
        if cmd=="exit"
          @out.puts "#{id}:end"
          next
        else
          ex = Executor.new(@dir_class,id,@shell_cmd,@shell_rc)
        end
      end
      ex.execute(cmd)
    else
      break if common_line(line)
    end
  end
end

#common_line(line) ⇒ Object



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/pwrake/worker/invoker.rb', line 118

def common_line(line)
  case line
  when /^exit$/o
    return true
    #
  when /^kill:(.*)$/o
    kill_all($1)
    return false
    #
  when /^p$/o
    puts "Executor::LIST = #{Executor::LIST.inspect}"
    return false
    #
  else
    msg = "invalid line: #{line}"
    @log.fatal msg
    raise RuntimeError,msg
  end
end

#get_lineObject



37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/pwrake/worker/invoker.rb', line 37

def get_line
  begin
    line = $stdin.gets
    exit if !line
    line.chomp!
    line.strip!
    @log.info ">#{line}"
    return line
  rescue
    exit
  end
end

#kill_all(sig) ⇒ Object



138
139
140
141
142
# File 'lib/pwrake/worker/invoker.rb', line 138

def kill_all(sig)
  sig = sig.to_i if /^\d+$/o =~ sig
  @log.warn "worker_killed:signal=#{sig}"
  Executor::LIST.each{|id,exc| exc.kill(sig)}
end

#processor_countObject

from Michael Grosser’s parallel github.com/grosser/parallel



169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/pwrake/worker/invoker.rb', line 169

def processor_count
  host_os = RbConfig::CONFIG['host_os']
  case host_os
  when /linux|cygwin/
    ncpu = 0
    open("/proc/cpuinfo").each do |l|
      ncpu += 1 if /^processor\s+: \d+/=~l
    end
    ncpu
  when /darwin9/
    `hwprefs cpu_count`.to_i
  when /darwin/
    (hwprefs_available? ? `hwprefs thread_count` : `sysctl -n hw.ncpu`).to_i
  when /(open|free)bsd/
    `sysctl -n hw.ncpu`.to_i
  when /mswin|mingw/
    require 'win32ole'
    wmi = WIN32OLE.connect("winmgmts://")
    cpu = wmi.ExecQuery("select NumberOfLogicalProcessors from Win32_Processor")
    cpu.to_enum.first.NumberOfLogicalProcessors
  when /solaris2/
    `psrinfo -p`.to_i # physical cpus
  else
    raise "Unknown architecture: #{host_os}"
  end
end

#runObject



50
51
52
53
54
55
56
57
58
# File 'lib/pwrake/worker/invoker.rb', line 50

def run
  setup_option
  if setup_loop
    start_heartbeat
    command_loop
  end
ensure
  close_all
end

#setup_loopObject



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/pwrake/worker/invoker.rb', line 70

def setup_loop
  while line = get_line
    case line
    when /^(\d+):open$/o
      $1.split.each do |id|
        Executor.new(@dir_class,id,@shell_cmd,@shell_rc)
      end
    when "setup_end"
      return true
    else
      return false if common_line(line)
    end
  end
  false
end

#setup_optionObject



60
61
62
63
64
65
66
67
68
# File 'lib/pwrake/worker/invoker.rb', line 60

def setup_option
  @log.info @option.inspect
  @heartbeat_interval = @option[:heartbeat]
  @shell_cmd = @option[:shell_command]
  @shell_rc = @option[:shell_rc] || []
  (@option[:pass_env]||{}).each do |k,v|
    ENV[k] = v
  end
end

#start_heartbeatObject



86
87
88
89
90
91
92
93
94
95
# File 'lib/pwrake/worker/invoker.rb', line 86

def start_heartbeat
  if @heartbeat_interval
    @heartbeat_thread = Thread.new do
      while true
        @out.puts "heartbeat"
        sleep @heartbeat_interval
      end
    end
  end
end