Class: Hive::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/hive/worker.rb,
lib/hive/worker/shell.rb

Overview

The generic worker class

Direct Known Subclasses

Shell

Defined Under Namespace

Classes: DeviceNotReady, InvalidJobReservationError, NoPortsAvailable, Shell

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ Worker

The main worker process loop



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/hive/worker.rb', line 28

def initialize(options)
  @options = options
  @parent_pid = @options['parent_pid']
  @device_id = @options['id']
  @hive_id = @options['hive_id']
  @default_component ||= self.class.to_s
  @hive_mind ||= mind_meld_klass.new(
    url: Chamber.env.network.hive_mind? ? Chamber.env.network.hive_mind : nil,
    pem: Chamber.env.network.cert ? Chamber.env.network.cert : nil,
    ca_file: Chamber.env.network.cafile ? Chamber.env.network.cafile : nil,
    verify_mode: Chamber.env.network.verify_mode ? Chamber.env.network.verify_mode : nil,
    device: hive_mind_device_identifiers
  )
  @device_identity = @options['device_identity'] || 'unknown-device'
  pid = Process.pid
  $PROGRAM_NAME = "#{@options['name_stub'] || 'WORKER'}.#{pid}"
  @log = Hive::Log.new
  @log.add_logger(
    "#{LOG_DIRECTORY}/#{pid}.#{@device_identity}.log",
    Hive.config.logging.worker_level || 'INFO'
  )
  @log.hive_mind = @hive_mind
  @log.default_progname = @default_component

  self.update_queues

  @port_allocator = (@options.has_key?('port_allocator') ? @options['port_allocator'] : Hive::PortAllocator.new(ports: []))
  
  platform = self.class.to_s.scan(/[^:][^:]*/)[2].downcase
  @diagnostic_runner = Hive::DiagnosticRunner.new(@options, Hive.config.diagnostics, platform, @hive_mind) if Hive.config.diagnostics? && Hive.config.diagnostics[platform]

  Hive::Messages.configure do |config|
    config.base_path = Hive.config.network.scheduler
    config.pem_file = Hive.config.network.cert
    config.ssl_verify_mode = OpenSSL::SSL::VERIFY_NONE
  end

  Signal.trap('TERM') do
    @log.info("Worker terminated")
    exit
  end

  @log.info('Starting worker')
  while keep_running?
    begin
      @log.clear
      update_queues
      poll_queue if diagnostics
    rescue DeviceNotReady => e
      @log.warn("#{e.message}\n");
    rescue StandardError => e
      @log.warn("Worker loop aborted: #{e.message}\n  : #{e.backtrace.join("\n  : ")}")
    end
    sleep Hive.config.timings.worker_loop_interval
  end
  @log.info('Exiting worker')
end

Instance Attribute Details

#device_apiObject

Device API Object for device associated with this worker



25
26
27
# File 'lib/hive/worker.rb', line 25

def device_api
  @device_api
end

#queuesObject

Device API Object for device associated with this worker



25
26
27
# File 'lib/hive/worker.rb', line 25

def queues
  @queues
end

Instance Method Details

#allocate_portObject

Allocate a port



413
414
415
416
417
# File 'lib/hive/worker.rb', line 413

def allocate_port
  @log.warn("Using deprecated 'Hive::Worker.allocate_port' method")
  @log.warn("Use @port_allocator.allocate_port instead")
  @port_allocator.allocate_port
end

#autogenerated_queuesObject

List of autogenerated queues for the worker



255
256
257
# File 'lib/hive/worker.rb', line 255

def autogenerated_queues
  []
end

#checkout_code(repository, checkout_directory, branch) ⇒ Object

Get a checkout of the repository



378
379
380
# File 'lib/hive/worker.rb', line 378

def checkout_code(repository, checkout_directory, branch)
  CodeCache.repo(repository).checkout(:head, checkout_directory, branch) or raise "Unable to checkout repository #{repository}"
end

#cleanupObject

Do whatever device cleanup is required



409
410
# File 'lib/hive/worker.rb', line 409

def cleanup
end

#detect_res_file(results_dir) ⇒ Object



352
353
354
# File 'lib/hive/worker.rb', line 352

def detect_res_file(results_dir)
  Dir.glob( "#{results_dir}/*.res" ).first
end

#device_statusObject

Current state of the device This method should be replaced in child classes, as appropriate



244
245
246
# File 'lib/hive/worker.rb', line 244

def device_status
  @device_status ||= 'happy'
end

#diagnosticsObject

Diagnostics function to be extended in child class, as required

Raises:



233
234
235
236
237
238
239
240
# File 'lib/hive/worker.rb', line 233

def diagnostics
  retn = true
  retn = @diagnostic_runner.run if !@diagnostic_runner.nil?
  status = device_status
  status = set_device_status('happy') if status == 'busy'
  raise DeviceNotReady.new("Current device status: '#{status}'") if status != 'happy'
  retn
end

#execute_jobObject

Execute a job



128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# File 'lib/hive/worker.rb', line 128

def execute_job
  # Ensure that a killed worker cleans up correctly
  Signal.trap('TERM') do |s|
    Signal.trap('TERM') {} # Prevent retry signals
    @log.info "Caught TERM signal"
    @log.info "Terminating script, if running"
    @script.terminate if @script
    @log.info "Post-execution cleanup"
    signal_safe_post_script(@job, @file_system, @script)

    # Upload results
    @file_system.finalise_results_directory
    upload_files(@job, @file_system.results_path, @file_system.logs_path)
    set_job_state_to :completed
    @job.error('Worker killed')
    @log.info "Worker terminated"
    exit
  end

  @log.info('Job starting')
  @job.prepare(@hive_mind.id)
  
  exception = nil
  begin
    @log.info "Setting job paths"
    @file_system = Hive::FileSystem.new(@job.job_id, Hive.config.logging.home, @log)
    set_job_state_to :preparing

    if ! @job.repository.to_s.empty?
      @log.info "Checking out the repository"
      @log.debug "  #{@job.repository}"
      @log.debug "  #{@file_system.testbed_path}"
      checkout_code(@job.repository, @file_system.testbed_path, @job.branch)
    end

    @log.info "Initialising execution script"
    @script = Hive::ExecutionScript.new(
      file_system: @file_system,
      log: @log,
      keep_running: ->() { self.keep_running? }
    )
    @script.append_bash_cmd "mkdir -p #{@file_system.testbed_path}/#{@job.execution_directory}"
    @script.append_bash_cmd "cd #{@file_system.testbed_path}/#{@job.execution_directory}"

    @log.info "Setting the execution variables in the environment"
    @script.set_env 'HIVE_RESULTS', @file_system.results_path
    @job.execution_variables.to_h.each_pair do |var, val|
      @script.set_env "HIVE_#{var.to_s}".upcase, val if ! val.kind_of?(Array)
    end
    if @job.execution_variables.retry_urns && !@job.execution_variables.retry_urns.empty?
      @script.set_env "RETRY_URNS", @job.execution_variables.retry_urns
    end
    if @job.execution_variables.tests && @job.execution_variables.tests != [""]
      @script.set_env "TEST_NAMES", @job.execution_variables.tests
    end
    

    @log.info "Appending test script to execution script"
    @script.append_bash_cmd @job.command

    set_job_state_to :running
    @job.start

    @log.info "Pre-execution setup"
    pre_script(@job, @file_system, @script)

    @log.info "Running execution script"
    exit_value = @script.run
    @job.end(exit_value)
  rescue => e
    exception = e
  end

  begin
    @log.info "Post-execution cleanup"
    set_job_state_to :uploading
    post_script(@job, @file_system, @script)

    # Upload results
    @file_system.finalise_results_directory
    upload_files(@job, @file_system.results_path, @file_system.logs_path)
    upload_results(@job, "#{@file_system.testbed_path}/#{@job.execution_directory}", @file_system.results_path)
  rescue => e
    @log.error( "Post execution failed: " + e.message)
    @log.error("  : #{e.backtrace.join("\n  : ")}")
  end

  if exception
    @job.error( exception.message )
    set_job_state_to :completed
    raise exception
  else
    @job.complete
  end

  Signal.trap('TERM') do
    @log.info("Worker terminated")
    exit
  end

  set_job_state_to :completed
  exit_value == 0
end

#hive_mind_device_identifiersObject

Parameters for uniquely identifying the device



441
442
443
# File 'lib/hive/worker.rb', line 441

def hive_mind_device_identifiers
  { id: @device_id }
end

#job_message_klassObject

Get the correct job class This should usually be replaced in the child class



113
114
115
116
# File 'lib/hive/worker.rb', line 113

def job_message_klass
  @log.info 'Generic job class'
  Hive::Messages::Job
end

#keep_running?Boolean

Determine whether to keep the worker running This just checks the presense of the parent process

Returns:

  • (Boolean)


384
385
386
387
388
389
390
391
# File 'lib/hive/worker.rb', line 384

def keep_running?
  begin
    Process.getpgid(@parent_pid)
    true
  rescue
    false
  end
end

#lion_config(checkout) ⇒ Object



373
374
375
# File 'lib/hive/worker.rb', line 373

def lion_config(checkout)
  Dir.glob( "#{checkout}/.lion.yml" ).first
end

#mind_meld_klassObject



118
119
120
# File 'lib/hive/worker.rb', line 118

def mind_meld_klass
  MindMeld::Device
end

#poll_queueObject

Check the queues for work



87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/hive/worker.rb', line 87

def poll_queue
  @job = reserve_job
  if @job.nil?
    @log.info('No job found')
  else
    @log.info('Job starting')
    begin
      execute_job
    rescue => e
      @log.info("Error running test: #{e.message}\n : #{e.backtrace.join("\n :")}")
    end
    cleanup
  end
end

#post_script(job, file_system, script) ⇒ Object

Any device specific steps immediately after the execution script



398
399
400
# File 'lib/hive/worker.rb', line 398

def post_script(job, file_system, script)
  signal_safe_post_script(job, file_system, script)
end

#pre_script(job, file_system, script) ⇒ Object

Any setup required before the execution script



394
395
# File 'lib/hive/worker.rb', line 394

def pre_script(job, file_system, script)
end

#process_xunit_results(results_dir) ⇒ Object



356
357
358
359
360
361
362
363
364
365
366
367
# File 'lib/hive/worker.rb', line 356

def process_xunit_results(results_dir) 
  if !Dir.glob("#{results_dir}/*.xml").empty?
    xunit_output = Res.parse_results(parser: :junit,:file =>  Dir.glob( "#{results_dir}/*.xml" ).first)
    res_output = File.open(xunit_output.io, "rb")
    contents = res_output.read
    res_output.close
    res = File.open("#{results_dir}/xunit.res", "w+")
    res.puts contents   
    res.close   
    res 
  end
end

#release_all_portsObject

Release all ports



427
428
429
430
431
# File 'lib/hive/worker.rb', line 427

def release_all_ports
  @log.warn("Using deprecated 'Hive::Worker.release_all_ports' method")
  @log.warn("Use @port_allocator.release_all_ports instead")
  @port_allocator.release_all_ports
end

#release_port(p) ⇒ Object

Release a port



420
421
422
423
424
# File 'lib/hive/worker.rb', line 420

def release_port(p)
  @log.warn("Using deprecated 'Hive::Worker.release_port' method")
  @log.warn("Use @port_allocator.release_port instead")
  @port_allocator.release_port(p)
end

#reservation_detailsObject



122
123
124
125
# File 'lib/hive/worker.rb', line 122

def reservation_details
  @log.debug "Reservations details: hive_id=#{@hive_id}, worker_pid=#{Process.pid}, device_id=#{@hive_mind.id}"
  { hive_id: @hive_id, worker_pid: Process.pid, device_id: @hive_mind.id }
end

#reserve_jobObject

Try to find and reserve a job



103
104
105
106
107
108
109
# File 'lib/hive/worker.rb', line 103

def reserve_job
  @log.info "Trying to reserve job for queues: #{@queues.join(', ')}"
  job = job_message_klass.reserve(@queues, reservation_details)
  @log.debug "Job: #{job.inspect}"
  raise InvalidJobReservationError.new("Invalid Job Reserved") if ! (job.nil? || job.valid?)
  job
end

#set_device_status(status) ⇒ Object

Set the status of a device This method should be replaced in child classes, as appropriate



250
251
252
# File 'lib/hive/worker.rb', line 250

def set_device_status(status)
  @device_status = status
end

#set_job_state_to(state) ⇒ Object

Set job info file



434
435
436
437
438
# File 'lib/hive/worker.rb', line 434

def set_job_state_to state
  File.open("#{@file_system.home_path}/job_info", 'w') do |f|
    f.puts "#{Process.pid} #{state}"
  end
end

#signal_safe_post_script(job, file_system, script) ⇒ Object

Any device specific steps immediately after the execution script that can be safely run in the a Signal.trap This should be called by post_script



405
406
# File 'lib/hive/worker.rb', line 405

def signal_safe_post_script(job, file_system, script)
end

#testmine_config(checkout) ⇒ Object



369
370
371
# File 'lib/hive/worker.rb', line 369

def testmine_config(checkout)
  Dir.glob( "#{checkout}/.testmi{n,t}e.yml" ).first
end

#update_queue_logObject



268
269
270
# File 'lib/hive/worker.rb', line 268

def update_queue_log
  File.open("#{LOG_DIRECTORY}/#{Process.pid}.queues.yml",'w') { |f| f.write @queues.to_yaml}
end

#update_queuesObject



259
260
261
262
263
264
265
266
# File 'lib/hive/worker.rb', line 259

def update_queues
  # Get Queues from Hive Mind
  @log.debug("Getting queues from Hive Mind")
  @queues = (autogenerated_queues + @hive_mind.hive_queues(true)).uniq
  @log.debug("hive queues: #{@hive_mind.hive_queues}")
  @log.debug("Full list of queues: #{@queues}")
  update_queue_log
end

#upload_files(job, *paths) ⇒ Object

Upload any files from the test



273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
# File 'lib/hive/worker.rb', line 273

def upload_files(job, *paths)
  @log.info("Uploading assets")
  paths.each do |path|
    @log.info("Uploading files from #{path}")
    Dir.foreach(path) do |item|
      @log.info("File: #{item}")
      next if item == '.' or item == '..'
      begin
        artifact = job.report_artifact("#{path}/#{item}")
        @log.info("Artifact uploaded: #{artifact.attributes.to_s}")
      rescue => e
        @log.error("Error uploading artifact #{item}: #{e.message}")
        @log.error("  : #{e.backtrace.join("\n  : ")}")
      end
    end
  end
end

#upload_results(job, checkout, results_dir) ⇒ Object

Update results



292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
# File 'lib/hive/worker.rb', line 292

def upload_results(job, checkout, results_dir)

  res_file = detect_res_file(results_dir) || process_xunit_results(results_dir)
  
  if res_file
    @log.info("Res file found")
  
    begin
      Res.submit_results(
        reporter: :hive,
        ir: res_file,
        job_id: job.job_id
      )
    rescue => e
      @log.warn("Res Hive upload failed #{e.message}")
    end
  
    begin
      if conf_file = testmine_config(checkout)
        Res.submit_results(
          reporter: :testmine,
          ir: res_file,
          config_file: conf_file,
          hive_job_id: job.job_id,
          version: job.execution_variables.version,
          target: job.execution_variables.queue_name
        )
      end
    rescue => e
      @log.warn("Res Testmine upload failed #{e.message}")
    end

    begin
      if conf_file = lion_config(checkout)
        Res.submit_results(
            reporter: :lion,
            ir: res_file,
            config_file: conf_file,
            hive_job_id: job.job_id,
            version: job.execution_variables.version,
            target: job.execution_variables.queue_name,
            cert: Chamber.env.network.cert,
            cacert: Chamber.env.network.cafile,
            ssl_verify_mode: Chamber.env.network.verify_mode
        )
      end
    rescue => e
      @log.warn("Res Lion upload failed #{e.message}")

      end




    # TODO Add in Testrail upload
  
  end
  
end