Class: CobraCommander::Executor::WorkerPool

Inherits:
Object
  • Object
show all
Defined in:
lib/cobra_commander/executor/worker_pool.rb

Overview

This WorkerPooll will queue up jobs, and execute them using Worker’s, each with a thread running our work loop using the given runner.

  • A job is defined by a group of arguments to be passed to the runner.

  • A runner is an object that respond to #call(tty, *args), where TTY is an instance of TTY::Command, and *args are the arguments queued in the worker pool.

  • The runner call method must return an array of [status, output]

  • A worker manages a thread running the job wirh runner.call and updates the job result and output.

Defined Under Namespace

Classes: Job, Worker

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(runner:, workers:, printer:, jobs:, &name_f) ⇒ WorkerPool

Returns a new instance of WorkerPool.



58
59
60
61
62
63
64
65
66
# File 'lib/cobra_commander/executor/worker_pool.rb', line 58

def initialize(runner:, workers:, printer:, jobs:, &name_f)
  @tty = ::CobraCommander::Executor::IsolatedPTY.new(printer: printer)
  @runner = runner
  @workers = Array.new(workers) { |id| Worker.new(id, self) }
  @jobs = []
  @queue = Queue.new

  push_all(jobs, &(name_f || :describe))
end

Instance Attribute Details

#jobsObject (readonly)

Returns the value of attribute jobs.



56
57
58
# File 'lib/cobra_commander/executor/worker_pool.rb', line 56

def jobs
  @jobs
end

Instance Method Details

#error?Boolean

Returns:

  • (Boolean)


68
69
70
# File 'lib/cobra_commander/executor/worker_pool.rb', line 68

def error?
  jobs.map(&:status).any?(:error)
end

#push(name, *args) ⇒ Object



76
77
78
79
80
81
# File 'lib/cobra_commander/executor/worker_pool.rb', line 76

def push(name, *args)
  Job.new(name, args).tap do |job|
    @jobs.push(job)
    @queue.push(job)
  end
end

#push_all(jobs, &name_f) ⇒ Object



72
73
74
# File 'lib/cobra_commander/executor/worker_pool.rb', line 72

def push_all(jobs, &name_f)
  jobs.each { push(name_f&.(_1), *Array(_1)) }
end

#run_nextObject



96
97
98
99
100
101
# File 'lib/cobra_commander/executor/worker_pool.rb', line 96

def run_next
  return :exit if @stop
  return :exit unless (job = @queue.pop)

  job.resolve!(*@runner.call(@tty, *job.args))
end

#startObject



83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/cobra_commander/executor/worker_pool.rb', line 83

def start
  @stop = false
  @workers.each(&:spawn)
  @queue.close
  begin
    @workers.each(&:wait)
  rescue Interrupt
    @workers.each(&:kill) if @stop
    @stop = true
    retry
  end
end