Class: LoadNode

Inherits:
Object
  • Object
show all
Defined in:
lib/load/load_node.rb

Constant Summary collapse

@@load_node_instance =
nil

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(run_id, server_address, node_code = nil) ⇒ LoadNode

Returns a new instance of LoadNode.



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/load/load_node.rb', line 20

def initialize(run_id, server_address, node_code = nil)
  if (@@load_node_instance != nil)
    raise "LoadNode instance initialized a second time"
  end
  @target_code = target_code
  @test_index = 1
  @messenger = nil
  @@load_node_instance = self
  @factory = TestFactory.new
  if (server_address == nil)
    # Perf tests don't require server
  else
    begin
      puts "Registering node"
      @run_id = run_id
      @child_pids = []
      @messenger = HttpReport.new(server_address)
      node_info = @messenger.node_register({run_id: @run_id, address: "#{Socket.gethostname}", code: node_code})
      if (node_info['configuration'] == nil)
        puts "Error starting node.  Did run already complete?"
      else
        @configuration = eval(node_info['configuration'])
        @node_id = node_info["id"].to_i
        @node_code = node_info["code"]
        @target_code = node_info["target_server"]
        @target_server = @configuration[:servers][@target_code.to_sym]
        puts "Node registered: #{@node_id}   Code: #{@node_code}   Target server: #{@target_server}"
        @schedule = @messenger.node_schedule({node_id: @node_id})
        @load_tests = create_node_tests(@node_id, @schedule)
        @duration_millis = node_info["duration"]
        @messenger.node_ready({node_id: @node_id})
        wait_for_node_start_ok
        launch_node_tests
        wait_for_finish
        @messenger.node_finished({node_id: @node_id})
      end
    rescue Interrupt
      puts "NODE (#{@node_id}) - Interrupt signal received, quitting.  [#{self.class}]"
    rescue Exception => exc
      puts "Exception in node: #{exc.message} \n" + exc.backtrace.join("\n")
    ensure
      kill_child_processes
    end
    #exit 0
  end
end

Instance Attribute Details

#duration_millisObject (readonly)

Returns the value of attribute duration_millis.



13
14
15
# File 'lib/load/load_node.rb', line 13

def duration_millis
  @duration_millis
end

#node_idObject (readonly)

Returns the value of attribute node_id.



13
14
15
# File 'lib/load/load_node.rb', line 13

def node_id
  @node_id
end

#scheduleObject (readonly)

Returns the value of attribute schedule.



13
14
15
# File 'lib/load/load_node.rb', line 13

def schedule
  @schedule
end

#target_codeObject

Returns the value of attribute target_code.



14
15
16
# File 'lib/load/load_node.rb', line 14

def target_code
  @target_code
end

#target_serverObject

Returns the value of attribute target_server.



14
15
16
# File 'lib/load/load_node.rb', line 14

def target_server
  @target_server
end

Class Method Details

.instanceObject



16
17
18
# File 'lib/load/load_node.rb', line 16

def self.instance
  return @@load_node_instance
end

Instance Method Details

#create_node_tests(node_id, node_schedule) ⇒ Object



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/load/load_node.rb', line 132

def create_node_tests(node_id, node_schedule)
  wasp_id = 1
  load_tests = []

  puts "Node Schedule:"
  node_schedule.each do |test_schedule|
    test_name = test_schedule["test_name"]
    events = test_schedule["events"]
    sched_text = "Test: #{test_name}  Events: "
    events.each do |event|
      sched_text += "#{event['action'].capitalize} at #{event['time']} sec     "
    end
    test_config = find_config_for_test(test_name)
    test = @factory.create_test(test_name, @target_code, test_config)
    test.index = @test_index
    @test_index += 1

    load_test = LoadTest.new(self, @run_id, node_id, wasp_id, test, events, @messenger)
    load_tests << load_test
    wasp_id += 1
  end

  node_monitors = @messenger.node_monitors( {node_id: @node_id} )
  node_monitors.each do |monitor_config|
    type = monitor_config["type"]
    name = monitor_config["name"]
    duration = monitor_config["duration"].to_i / 1000
    if (name == nil)
      name = "#{type.capitalize} Monitor"
    end

    if (type == "network")
      puts "Config: #{monitor_config}"
      address = monitor_config["address"]
      monitor = NetworkMonitor.new(address)
    elsif (type == "cpu")
      monitor = CpuMonitor.new()
    elsif (type == "file.io")
      monitor = FileIoMonitor.new()
    elsif (type == "ram")
      monitor = RamMonitor.new()
    else
      raise "Do no know how to handle monitor type: #{type}   Name: #{name}"
    end

    events = [{"time" => "0", "action"  => "run"}, {"time" => "#{duration}", "action" => "pause"}]
    load_test = LoadTest.new(self, @run_id, node_id, wasp_id, monitor, events, @messenger)
    load_tests << load_test
    wasp_id += 1
  end

  return load_tests
end

#find_config_for_test(test_name) ⇒ Object



186
187
188
189
190
191
192
193
# File 'lib/load/load_node.rb', line 186

def find_config_for_test(test_name)
  test_configs = @configuration[:tests]
  test_configs.each do |test_config|
    if (test_config[:test] == test_name)
      return test_config
    end
  end
end

#kill_child_processesObject



196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# File 'lib/load/load_node.rb', line 196

def kill_child_processes
  # puts "=================================================================================="
  # puts "Node Finished: #{@node_id}"
  # puts "  Killing child PID's:  #{@child_pids}"
  # puts "=================================================================================="

  # trap("INT") do
  #   exit
  # end
  Process.kill('INT', -Process.getpgrp)
  #
  # @child_pids.each do |pid|
  #   begin
  #     puts "Killing: #{pid}"
  #     Process.kill("INT", pid)
  #   rescue Exception => e
  #     puts "Exception killing pid #{pid}.  #{e.message}"
  #   end
  # end
end

#launch_node_testsObject



78
79
80
81
82
83
84
85
86
# File 'lib/load/load_node.rb', line 78

def launch_node_tests
  @load_tests.each do |test|
    pid = fork {
      test.run
    }
    @child_pids << pid
    write_pid_to_file(pid)
  end
end

#report_block_result(test_code, runner_id, time_ellapsed, benchmark_time, result) ⇒ Object



67
68
69
# File 'lib/load/load_node.rb', line 67

def report_block_result(test_code, runner_id, time_ellapsed, benchmark_time, result)
  @messenger.report_result(@run_id, @node_id, test_code, runner_id, time_ellapsed, benchmark_time, result)
end

#wait_for_finishObject



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/load/load_node.rb', line 104

def wait_for_finish
    @start_time = Time.now.to_i
    quit = false
    while !quit
      sleep 10
      ellapsed_time = Time.now.to_i - @start_time
      if (ellapsed_time > (@duration_millis / 1000))
        quit = true
        puts "Node duration reached: #{@duration_millis} seconds.  Node quitting"
      else
        #puts "Node duration (#{ellapsed_time} secs) NOT reached: #{@duration_millis/1000} seconds."
      end
      @messenger.node_schedule({node_id: @node_id})

      status = @messenger.status_for_test(@run_id, nil)
      if (status['run_status'] == "killed")
        puts "XXXXX  RUN HAS BEEN KILLED - KILLING NODE: #{@wasp_id}  XXXXXXX"
        quit = true
      elsif (status['run_status'] == "finished")
        puts "XXXXX  RUN IS FINISHED - KILLING NODE: #{@wasp_id}  XXXXXXX"
        quit = true
      end

    end
    puts "Node finished.  ##{@node_code}"
end

#wait_for_node_start_okObject



89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/load/load_node.rb', line 89

def wait_for_node_start_ok
  ok_to_start = false
  last_message_at = 0
  while (!ok_to_start)
    ok_to_start = @messenger.node_start?({node_id: @node_id})
    sleep 0.2
    if (Time.new.to_i - last_message_at > 5)
      puts "Node waiting for other nodes to start (#{@node_code})"
      last_message_at = Time.new.to_i
    end
  end
  puts "All Nodes Launched.  Node (#{@node_code}) can start now "
end

#write_pid_to_file(pid) ⇒ Object



72
73
74
75
76
# File 'lib/load/load_node.rb', line 72

def write_pid_to_file(pid)
  open('node_pids_#{@node_id}.pid', 'a') { |f|
    f.puts "#{pid}\n"
  }
end