Class: Ripe::WorkerController

Inherits:
Object
  • Object
show all
Defined in:
lib/ripe/worker_controller.rb,
lib/ripe/worker_controller/syncer.rb,
lib/ripe/worker_controller/preparer.rb

Overview

This class controls workers as well as their relationship with regards to the compute cluster: worker preparation, submission, cancellation as well as sync.

Defined Under Namespace

Classes: Preparer, Syncer

Instance Method Summary collapse

Instance Method Details

#cancel(workers) ⇒ Array<DB::Worker>

Cancel worker jobs in the compute cluster system.

Parameters:

Returns:

  • (Array<DB::Worker>)

    the list of workers given in arguments, with modified states



84
85
86
87
88
89
90
91
92
93
# File 'lib/ripe/worker_controller.rb', line 84

def cancel(workers)
  distribute workers do |worker|
    if ['queueing', 'idle', 'blocked', 'active'].include? worker.status
      `canceljob #{worker.moab_id}`
      worker.update(status: :cancelled)
    else
      puts "Worker #{worker.id} could not be cancelled: not started"
    end
  end
end

#distribute(workers, &block) ⇒ Array<DB::Worker>

Apply a block to a list of workers.

Parameters:

Returns:

  • (Array<DB::Worker>)

    the list of workers given in arguments, with modified states



35
36
37
38
39
40
41
# File 'lib/ripe/worker_controller.rb', line 35

def distribute(workers, &block)
  workers = [workers] if workers.is_a? DB::Worker
  workers.map do |w|
    block.call(w)
    w
  end
end

#edit(*args) ⇒ void

This method returns an undefined value.

Launch the an interactive text editor from the console.



121
122
123
# File 'lib/ripe/worker_controller.rb', line 121

def edit(*args)
  system("$EDITOR #{args.join(' ')}")
end

#list(n = 20) ⇒ Array<DB::Worker>

List the n most recent workers.

Parameters:

  • n (Integer) (defaults to: 20)

    the number of most recent workers

Returns:

  • (Array<DB::Worker>)

    the list of n most recent workers



112
113
114
# File 'lib/ripe/worker_controller.rb', line 112

def list(n = 20)
  DB::Worker.last(n)
end

#local(workers) ⇒ Array<DB::Worker>

Run worker job code into bash locally.

Parameters:

Returns:

  • (Array<DB::Worker>)

    the list of workers given in arguments, with modified states



49
50
51
52
53
54
55
56
57
58
59
# File 'lib/ripe/worker_controller.rb', line 49

def local(workers)
  distribute workers do |worker|
    worker.update(status: :active_local)

    `bash #{worker.sh}`
    exit_code = $?.to_i

    worker.update(status:    :completed,
                  exit_code: exit_code)
  end
end

#prepare(workflow, samples, params = {}) ⇒ Array<Worker>

Prepare workers by applying the workflow callback and its parameters to each sample.

Parameters:

  • workflow (String)

    the name of a workflow to apply on the sample list

  • samples (Array<String>)

    list of samples to apply the callback to

  • params (Hash<Symbol, String>) (defaults to: {})

    a list of worker-wide parameters

Returns:

  • (Array<Worker>)

    workers prepared in current batch

See Also:



23
24
25
# File 'lib/ripe/worker_controller.rb', line 23

def prepare(workflow, samples, params = {})
  Preparer.new(workflow, samples, params).workers
end

#start(workers) ⇒ Array<DB::Worker>

Submit worker jobs to the compute cluster system.

Parameters:

Returns:

  • (Array<DB::Worker>)

    the list of workers given in arguments, with modified states



67
68
69
70
71
72
73
74
75
76
# File 'lib/ripe/worker_controller.rb', line 67

def start(workers)
  distribute workers do |worker|
    if worker.prepared?
      worker.update(status: :queueing,
                    moab_id: `qsub '#{worker.sh}'`.strip.split(/\./).first)
    else
      puts "Worker #{worker.id} could not be started: not prepared"
    end
  end
end

#syncArray<DB::Worker>

Synchronize the status of jobs with the internal list of workers.

Returns:

  • (Array<DB::Worker>)

    the list of updated workers

See Also:



102
103
104
# File 'lib/ripe/worker_controller.rb', line 102

def sync
  Syncer.new.workers
end