Class: Pwrake::WorkerCommunicator

Inherits:
Object
  • Object
show all
Defined in:
lib/pwrake/branch/worker_communicator.rb

Constant Summary collapse

HOST2ID =
{}
RE_ID =
'\d+'

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(id, host, ncore, runner, option) ⇒ WorkerCommunicator

Returns a new instance of WorkerCommunicator.



18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/pwrake/branch/worker_communicator.rb', line 18

def initialize(id,host,ncore,runner,option)
  @id = id
  @host = host
  HOST2ID[@host] = @id
  @ncore = @n_total_core = ncore
  #
  @runner = runner
  @worker_progs = option.worker_progs
  @option = option.worker_option
  if hb = @option[:heartbeat]
    @heartbeat_timeout = hb + 15
  end
end

Instance Attribute Details

#channelObject (readonly)

Returns the value of attribute channel.



7
8
9
# File 'lib/pwrake/branch/worker_communicator.rb', line 7

def channel
  @channel
end

#handlerObject (readonly)

Returns the value of attribute handler.



7
8
9
# File 'lib/pwrake/branch/worker_communicator.rb', line 7

def handler
  @handler
end

#hostObject (readonly)

Returns the value of attribute host.



7
8
9
# File 'lib/pwrake/branch/worker_communicator.rb', line 7

def host
  @host
end

#idObject (readonly)

Returns the value of attribute id.



7
8
9
# File 'lib/pwrake/branch/worker_communicator.rb', line 7

def id
  @id
end

#ncoreObject (readonly)

Returns the value of attribute ncore.



7
8
9
# File 'lib/pwrake/branch/worker_communicator.rb', line 7

def ncore
  @ncore
end

Class Method Details

.read_worker_progs(option) ⇒ Object



9
10
11
12
13
14
15
16
# File 'lib/pwrake/branch/worker_communicator.rb', line 9

def self.read_worker_progs(option)
  d = File.dirname(__FILE__)+'/../worker/'
  code = ""
  option.worker_progs.each do |f|
    code << IO.read(d+f+'.rb')
  end
  code
end

Instance Method Details

#closeObject



54
55
56
57
58
59
# File 'lib/pwrake/branch/worker_communicator.rb', line 54

def close
  if !@closed
    @closed = true
    @handler.close
  end
end

#common_line(s) ⇒ Object



80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/pwrake/branch/worker_communicator.rb', line 80

def common_line(s)
  Log.debug "WorkerCommunicator#common_line(#{s.inspect}) id=#{@id} host=#{@host}"
  case s
  when /^heartbeat$/
    @runner.heartbeat(@handler.ior)
  when /^exited$/
    return false
  when /^log:(.*)$/
    Log.info "worker(#{host})>#{$1}"
  else
    Log.warn "worker(#{host}) out> #{s.inspect}"
  end
  true
end

#ncore_proc(s) ⇒ Object



65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/pwrake/branch/worker_communicator.rb', line 65

def ncore_proc(s)
  if /^ncore:(.*)$/ =~ s
    a = $1
    Log.debug "#{a} @#{@host}"
    if /^(\d+)$/ =~ a
      set_ncore($1.to_i)
      return false
    else
      raise ArgumentError,"WorkerCommunicator#ncore_proc: s=#{s.inspect}"
    end
  else
    return common_line(s)
  end
end

#set_ncore(ncore) ⇒ Object



61
62
63
# File 'lib/pwrake/branch/worker_communicator.rb', line 61

def set_ncore(ncore)
  @ncore = ncore
end

#setup_connection(worker_code) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/pwrake/branch/worker_communicator.rb', line 32

def setup_connection(worker_code)
  rb_cmd = "ruby -e 'eval ARGF.read(#{worker_code.size})'"
  if ['localhost','localhost.localdomain','127.0.0.1'].include? @host
    cmd = "cd; #{rb_cmd}"
  else
    cmd = "ssh -x -T -q #{@option[:ssh_option]} #{@host} \"#{rb_cmd}\""
  end
  Log.debug cmd
  @handler = Handler.new(@runner) do |w0,w1,r2|
    @pid = spawn(cmd,:pgroup=>true,:out=>w0,:err=>w1,:in=>r2)
    w0.close
    w1.close
    r2.close
  end
  @handler.host = @host
  iow = @handler.iow
  iow.write(worker_code)
  Marshal.dump(@ncore,iow)
  Marshal.dump(@option,iow)
  @channel = Channel.new(@handler)
end

#start_default_fiberObject



95
96
97
98
99
100
101
# File 'lib/pwrake/branch/worker_communicator.rb', line 95

def start_default_fiber
  Fiber.new do
    while common_line(@channel.get_line)
    end
    Log.debug "WorkerCommunicator: end of default fiber"
  end.resume
end