Class: LoadNode

Inherits:
Object
  • Object
show all
Defined in:
lib/load/load_node.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(run_id, server_address, node_code = nil) ⇒ LoadNode

Returns a new instance of LoadNode.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/load/load_node.rb', line 19

def initialize(run_id, server_address, node_code = nil)
  if (@@load_node_instance != nil)
    raise "LoadNode instance initialized a second time"
  end
  @target_code = target_code
  @test_index = 1
  @messenger = nil
  @@load_node_instance = self
  @factory = TestFactory.new
  if (server_address == nil)
    # Perf tests don't require server
  else
    begin
      puts "Registering node"
      @run_id = run_id
      @child_pids = []
      @messenger = HttpReport.new(server_address)
      node_info = @messenger.node_register({run_id: @run_id, address: "#{Socket.gethostname}", code: node_code})
      if (node_info['configuration'] == nil)
        puts "Error starting node.  Did run already complete?"
      else  
        @configuration = eval(node_info['configuration'])
        @node_id = node_info["id"].to_i
        @node_code = node_info["code"]
        @target_code = node_info["target_server"]
        @target_server = @configuration[:servers][@target_code.to_sym]
        puts "Node registered: #{@node_id}   Code: #{@node_code}   Target server: #{@target_server}"
        @schedule = @messenger.node_schedule({node_id: @node_id})
        @load_tests = create_node_tests(@node_id, @schedule)
        @duration_millis = node_info["duration"]
        @messenger.node_ready({node_id: @node_id})
        wait_for_node_start_ok
        launch_node_tests
        wait_for_finish
        @messenger.node_finished({node_id: @node_id})
      end
    rescue Interrupt => e
      puts "NODE (#{@node_id}) - Interrupt signal received, quitting.  [#{self.class}]"
    rescue Exception => exc
      puts "Exception in node: #{exc.message} \n" + exc.backtrace.join("\n")
    ensure
      kill_child_processes
    end
    #exit 0
  end
end

Instance Attribute Details

#duration_millisObject (readonly)

Returns the value of attribute duration_millis.



12
13
14
# File 'lib/load/load_node.rb', line 12

def duration_millis
  @duration_millis
end

#node_idObject (readonly)

Returns the value of attribute node_id.



12
13
14
# File 'lib/load/load_node.rb', line 12

def node_id
  @node_id
end

#scheduleObject (readonly)

Returns the value of attribute schedule.



12
13
14
# File 'lib/load/load_node.rb', line 12

def schedule
  @schedule
end

#target_codeObject

Returns the value of attribute target_code.



13
14
15
# File 'lib/load/load_node.rb', line 13

def target_code
  @target_code
end

#target_serverObject

Returns the value of attribute target_server.



13
14
15
# File 'lib/load/load_node.rb', line 13

def target_server
  @target_server
end

Class Method Details

.instanceObject



15
16
17
# File 'lib/load/load_node.rb', line 15

def self.instance
  return @@load_node_instance
end

Instance Method Details

#create_node_tests(node_id, node_schedule) ⇒ Object



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/load/load_node.rb', line 131

def create_node_tests(node_id, node_schedule)
  wasp_id = 1
  load_tests = []

  puts "Node Schedule:"
  node_schedule.each do |test_schedule|
    test_name = test_schedule["test_name"]
    events = test_schedule["events"]
    sched_text = "Test: #{test_name}  Events: "
    events.each do |event|
      sched_text += "#{event['action'].capitalize} at #{event['time']} sec     "
    end
    test_config = find_config_for_test(test_name)
    test = @factory.create_test(test_name, @target_code, test_config)
    test.index = @test_index
    @test_index += 1
    
    load_test = LoadTest.new(self, @run_id, node_id, wasp_id, test, events, @messenger)
    load_tests << load_test
    wasp_id += 1
  end
  
  node_monitors = @messenger.node_monitors( {node_id: @node_id} )
  node_monitors.each do |monitor_config|
    type = monitor_config["type"]
    name = monitor_config["name"]
    duration = monitor_config["duration"].to_i / 1000
    if (name == nil)
      name = "#{type.capitalize} Monitor"
    end
    
    if (type == "network")
      puts "Config: #{monitor_config}"
      address = monitor_config["address"]
      monitor = NetworkMonitor.new(address)
    elsif (type == "cpu")
      monitor = CpuMonitor.new()
    elsif (type == "file.io")
      monitor = FileIoMonitor.new()
    elsif (type == "ram")
      monitor = RamMonitor.new()
    else
      raise "Do no know how to handle monitor type: #{type}   Name: #{name}"
    end
    
    events = [{"time" => "0", "action"  => "run"}, {"time" => "#{duration}", "action" => "pause"}]
    load_test = LoadTest.new(self, @run_id, node_id, wasp_id, monitor, events, @messenger)
    load_tests << load_test
    wasp_id += 1
  end
  
  return load_tests
end

#find_config_for_test(test_name) ⇒ Object



185
186
187
188
189
190
191
192
# File 'lib/load/load_node.rb', line 185

def find_config_for_test(test_name)
  test_configs = @configuration[:tests]
  test_configs.each do |test_config|
    if (test_config[:test] == test_name)
      return test_config
    end
  end
end

#kill_child_processesObject



195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/load/load_node.rb', line 195

def kill_child_processes
  # puts "=================================================================================="
  # puts "Node Finished: #{@node_id}"
  # puts "  Killing child PID's:  #{@child_pids}"
  # puts "=================================================================================="

  # trap("INT") do
  #   exit
  # end
  Process.kill('INT', -Process.getpgrp)
  # 
  # @child_pids.each do |pid|
  #   begin
  #     puts "Killing: #{pid}"
  #     Process.kill("INT", pid)
  #   rescue Exception => e
  #     puts "Exception killing pid #{pid}.  #{e.message}"
  #   end
  # end
end

#launch_node_testsObject



77
78
79
80
81
82
83
84
85
# File 'lib/load/load_node.rb', line 77

def launch_node_tests
  @load_tests.each do |test|
    pid = fork {
      test.run
    }
    @child_pids << pid
    write_pid_to_file(pid)
  end
end

#report_block_result(test_code, wasp_id, time_ellapsed, benchmark_time, result) ⇒ Object



66
67
68
# File 'lib/load/load_node.rb', line 66

def report_block_result(test_code, wasp_id, time_ellapsed, benchmark_time, result)
  @messenger.report_result(@run_id, @node_id, test_code, wasp_id, time_ellapsed, benchmark_time, result)
end

#wait_for_finishObject



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/load/load_node.rb', line 103

def wait_for_finish
    @start_time = Time.now.to_i
    quit = false
    while !quit
      sleep 10
      ellapsed_time = Time.now.to_i - @start_time
      if (ellapsed_time > (@duration_millis / 1000))
        quit = true
        puts "Node duration reached: #{@duration_millis} seconds.  Node quitting"
      else
        #puts "Node duration (#{ellapsed_time} secs) NOT reached: #{@duration_millis/1000} seconds."
      end
      @messenger.node_schedule({node_id: @node_id})
      
      status = @messenger.status_for_test(@run_id, nil)
      if (status['run_status'] == "killed")
        puts "XXXXX  RUN HAS BEEN KILLED - KILLING NODE: #{@wasp_id}  XXXXXXX"
        quit = true
      elsif (status['run_status'] == "finished")
        puts "XXXXX  RUN IS FINISHED - KILLING NODE: #{@wasp_id}  XXXXXXX"
        quit = true
      end
      
    end
    puts "Node finished.  ##{@node_code}"
end

#wait_for_node_start_okObject



88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/load/load_node.rb', line 88

def wait_for_node_start_ok
  ok_to_start = false
  last_message_at = 0
  while (!ok_to_start)
    ok_to_start = @messenger.node_start?({node_id: @node_id})
    sleep 0.2
    if (Time.new.to_i - last_message_at > 5)
      puts "Node waiting for other nodes to start (#{@node_code})"
      last_message_at = Time.new.to_i
    end
  end
  puts "All Nodes Launched.  Node (#{@node_code}) can start now "
end

#write_pid_to_file(pid) ⇒ Object



71
72
73
74
75
# File 'lib/load/load_node.rb', line 71

def write_pid_to_file(pid)  
  open('node_pids_#{@node_id}.pid', 'a') { |f|
    f.puts "#{pid}\n"
  }
end