Class: Rwm::TaskRunner
- Inherits:
-
Object
- Object
- Rwm::TaskRunner
- Defined in:
- lib/rwm/task_runner.rb
Defined Under Namespace
Classes: Result
Constant Summary collapse
- NO_TASK_PATTERN =
/ don.t\s+know\s+how\s+to\s+build\s+task | rake\s+--tasks /ix
Instance Attribute Summary collapse
-
#results ⇒ Object
readonly
Returns the value of attribute results.
Instance Method Summary collapse
- #failed_results ⇒ Object
-
#initialize(graph, packages: nil, buffered: false, concurrency: Etc.nprocessors) ⇒ TaskRunner
constructor
A new instance of TaskRunner.
-
#run_command(&command_proc) ⇒ Object
Run a shell command in each package using DAG scheduling.
-
#run_task(task) ⇒ Object
Run a rake task in each package.
- #success? ⇒ Boolean
Constructor Details
#initialize(graph, packages: nil, buffered: false, concurrency: Etc.nprocessors) ⇒ TaskRunner
Returns a new instance of TaskRunner.
25 26 27 28 29 30 31 32 |
# File 'lib/rwm/task_runner.rb', line 25 def initialize(graph, packages: nil, buffered: false, concurrency: Etc.nprocessors) @graph = graph @packages = packages || graph.packages.values @buffered = buffered @concurrency = concurrency @results = [] @mutex = Mutex.new end |
Instance Attribute Details
#results ⇒ Object (readonly)
Returns the value of attribute results.
23 24 25 |
# File 'lib/rwm/task_runner.rb', line 23 def results @results end |
Instance Method Details
#failed_results ⇒ Object
137 138 139 |
# File 'lib/rwm/task_runner.rb', line 137 def failed_results @results.select { |r| r.failed? || r.errored? } end |
#run_command(&command_proc) ⇒ Object
Run a shell command in each package using DAG scheduling. Starts each package as soon as its dependencies complete. The command_proc receives a Package and returns [command, args] array.
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/rwm/task_runner.rb', line 37 def run_command(&command_proc) package_names = @packages.map(&:name).to_set pending = @packages.dup completed = Set.new skipped = Set.new running = {} @interrupted = false mutex = Mutex.new condition = ConditionVariable.new previous_trap = Signal.trap("INT") do @interrupted = true # Cannot use mutex inside trap context — just kill threads directly. # Thread#kill is safe to call from trap context. running.each_value { |t| t.kill rescue nil } end done = false until done break if @interrupted mutex.synchronize do if pending.empty? && running.empty? done = true next end ready = pending.select { |pkg| ready?(pkg, package_names, completed) } ready.each do |pkg| break if running.size >= @concurrency break if @interrupted pending.delete(pkg) running[pkg.name] = Thread.new do begin result = run_single(pkg, &command_proc) rescue IOError # Thread killed during I/O (Ctrl+C) — suppress noise next rescue => e result = Result.new( package_name: pkg.name, task: "error", status: :errored, output: "Error: #{e.class}: #{e.message}" ) ensure next unless result # thread was killed before completing mutex.synchronize do @results << result running.delete(pkg.name) if result.success? completed << pkg.name else skip_names = @graph.transitive_dependents(pkg.name) .select { |n| package_names.include?(n) } skip_names.each do |name| skip_pkg = pending.find { |p| p.name == name } if skip_pkg pending.delete(skip_pkg) skipped << name @results << Result.new( package_name: name, task: "skipped", status: :dep_skipped, output: "Skipped due to failed dependency: #{pkg.name}" ) end end end condition.broadcast end end end end if running.any? && ready.empty? condition.wait(mutex) end end end raise Interrupt, "Interrupted by Ctrl+C" if @interrupted @results ensure Signal.trap("INT", previous_trap || "DEFAULT") end |
#run_task(task) ⇒ Object
Run a rake task in each package
127 128 129 130 131 |
# File 'lib/rwm/task_runner.rb', line 127 def run_task(task) run_command do |pkg| ["bundle", "exec", "rake", task] end end |
#success? ⇒ Boolean
133 134 135 |
# File 'lib/rwm/task_runner.rb', line 133 def success? @results.none? { |r| r.failed? || r.errored? } end |