Class: Pwrake::WorkerCommunicator
- Inherits:
-
Object
- Object
- Pwrake::WorkerCommunicator
- Defined in:
- lib/pwrake/branch/worker_communicator.rb
Constant Summary collapse
- HOST2ID =
{}
- RE_ID =
'\d+'
Instance Attribute Summary collapse
-
#channel ⇒ Object
readonly
Returns the value of attribute channel.
-
#handler ⇒ Object
readonly
Returns the value of attribute handler.
-
#host ⇒ Object
readonly
Returns the value of attribute host.
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#ncore ⇒ Object
readonly
Returns the value of attribute ncore.
Class Method Summary collapse
Instance Method Summary collapse
- #close ⇒ Object
- #common_line(s) ⇒ Object
-
#initialize(id, host, ncore, runner, option) ⇒ WorkerCommunicator
constructor
A new instance of WorkerCommunicator.
- #ncore_proc(s) ⇒ Object
- #set_ncore(ncore) ⇒ Object
- #setup_connection(worker_code) ⇒ Object
- #start_default_fiber ⇒ Object
Constructor Details
#initialize(id, host, ncore, runner, option) ⇒ WorkerCommunicator
Returns a new instance of WorkerCommunicator.
18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/pwrake/branch/worker_communicator.rb', line 18 def initialize(id,host,ncore,runner,option) @id = id @host = host HOST2ID[@host] = @id @ncore = @n_total_core = ncore # @runner = runner @worker_progs = option.worker_progs @option = option.worker_option if hb = @option[:heartbeat] @heartbeat_timeout = hb + 15 end end |
Instance Attribute Details
#channel ⇒ Object (readonly)
Returns the value of attribute channel.
7 8 9 |
# File 'lib/pwrake/branch/worker_communicator.rb', line 7 def channel @channel end |
#handler ⇒ Object (readonly)
Returns the value of attribute handler.
7 8 9 |
# File 'lib/pwrake/branch/worker_communicator.rb', line 7 def handler @handler end |
#host ⇒ Object (readonly)
Returns the value of attribute host.
7 8 9 |
# File 'lib/pwrake/branch/worker_communicator.rb', line 7 def host @host end |
#id ⇒ Object (readonly)
Returns the value of attribute id.
7 8 9 |
# File 'lib/pwrake/branch/worker_communicator.rb', line 7 def id @id end |
#ncore ⇒ Object (readonly)
Returns the value of attribute ncore.
7 8 9 |
# File 'lib/pwrake/branch/worker_communicator.rb', line 7 def ncore @ncore end |
Class Method Details
.read_worker_progs(option) ⇒ Object
9 10 11 12 13 14 15 16 |
# File 'lib/pwrake/branch/worker_communicator.rb', line 9 def self.read_worker_progs(option) d = File.dirname(__FILE__)+'/../worker/' code = "" option.worker_progs.each do |f| code << IO.read(d+f+'.rb') end code end |
Instance Method Details
#close ⇒ Object
54 55 56 57 58 59 |
# File 'lib/pwrake/branch/worker_communicator.rb', line 54 def close if !@closed @closed = true @handler.close end end |
#common_line(s) ⇒ Object
80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/pwrake/branch/worker_communicator.rb', line 80 def common_line(s) Log.debug "WorkerCommunicator#common_line(#{s.inspect}) id=#{@id} host=#{@host}" case s when /^heartbeat$/ @runner.heartbeat(@handler.ior) when /^exited$/ return false when /^log:(.*)$/ Log.info "worker(#{host})>#{$1}" else Log.warn "worker(#{host}) out> #{s.inspect}" end true end |
#ncore_proc(s) ⇒ Object
65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/pwrake/branch/worker_communicator.rb', line 65 def ncore_proc(s) if /^ncore:(.*)$/ =~ s a = $1 Log.debug "#{a} @#{@host}" if /^(\d+)$/ =~ a set_ncore($1.to_i) return false else raise ArgumentError,"WorkerCommunicator#ncore_proc: s=#{s.inspect}" end else return common_line(s) end end |
#set_ncore(ncore) ⇒ Object
61 62 63 |
# File 'lib/pwrake/branch/worker_communicator.rb', line 61 def set_ncore(ncore) @ncore = ncore end |
#setup_connection(worker_code) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/pwrake/branch/worker_communicator.rb', line 32 def setup_connection(worker_code) rb_cmd = "ruby -e 'eval ARGF.read(#{worker_code.size})'" if ['localhost','localhost.localdomain','127.0.0.1'].include? @host cmd = "cd; #{rb_cmd}" else cmd = "ssh -x -T -q #{@option[:ssh_option]} #{@host} \"#{rb_cmd}\"" end Log.debug cmd @handler = Handler.new(@runner) do |w0,w1,r2| @pid = spawn(cmd,:pgroup=>true,:out=>w0,:err=>w1,:in=>r2) w0.close w1.close r2.close end @handler.host = @host iow = @handler.iow iow.write(worker_code) Marshal.dump(@ncore,iow) Marshal.dump(@option,iow) @channel = Channel.new(@handler) end |
#start_default_fiber ⇒ Object
95 96 97 98 99 100 101 |
# File 'lib/pwrake/branch/worker_communicator.rb', line 95 def start_default_fiber Fiber.new do while common_line(@channel.get_line) end Log.debug "WorkerCommunicator: end of default fiber" end.resume end |