Class: Rwm::TaskRunner

Inherits:
Object
  • Object
show all
Defined in:
lib/rwm/task_runner.rb

Defined Under Namespace

Classes: Result

Constant Summary collapse

NO_TASK_PATTERN =
/
  don.t\s+know\s+how\s+to\s+build\s+task
  |
  rake\s+--tasks
/ix

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(graph, packages: nil, buffered: false, concurrency: Etc.nprocessors) ⇒ TaskRunner

Returns a new instance of TaskRunner.



25
26
27
28
29
30
31
32
# File 'lib/rwm/task_runner.rb', line 25

def initialize(graph, packages: nil, buffered: false, concurrency: Etc.nprocessors)
  @graph = graph
  @packages = packages || graph.packages.values
  @buffered = buffered
  @concurrency = concurrency
  @results = []
  @mutex = Mutex.new
end

Instance Attribute Details

#resultsObject (readonly)

Returns the value of attribute results.



23
24
25
# File 'lib/rwm/task_runner.rb', line 23

def results
  @results
end

Instance Method Details

#failed_resultsObject



137
138
139
# File 'lib/rwm/task_runner.rb', line 137

def failed_results
  @results.select { |r| r.failed? || r.errored? }
end

#run_command(&command_proc) ⇒ Object

Run a shell command in each package using DAG scheduling. Starts each package as soon as its dependencies complete. The command_proc receives a Package and returns [command, args] array.



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/rwm/task_runner.rb', line 37

def run_command(&command_proc)
  package_names = @packages.map(&:name).to_set

  pending = @packages.dup
  completed = Set.new
  skipped = Set.new
  running = {}
  @interrupted = false

  mutex = Mutex.new
  condition = ConditionVariable.new

  previous_trap = Signal.trap("INT") do
    @interrupted = true
    # Cannot use mutex inside trap context — just kill threads directly.
    # Thread#kill is safe to call from trap context.
    running.each_value { |t| t.kill rescue nil }
  end

  done = false
  until done
    break if @interrupted

    mutex.synchronize do
      if pending.empty? && running.empty?
        done = true
        next
      end

      ready = pending.select { |pkg| ready?(pkg, package_names, completed) }

      ready.each do |pkg|
        break if running.size >= @concurrency
        break if @interrupted

        pending.delete(pkg)
        running[pkg.name] = Thread.new do
          begin
            result = run_single(pkg, &command_proc)
          rescue IOError
            # Thread killed during I/O (Ctrl+C) — suppress noise
            next
          rescue => e
            result = Result.new(
              package_name: pkg.name, task: "error",
              status: :errored, output: "Error: #{e.class}: #{e.message}"
            )
          ensure
            next unless result # thread was killed before completing

            mutex.synchronize do
              @results << result
              running.delete(pkg.name)
              if result.success?
                completed << pkg.name
              else
                skip_names = @graph.transitive_dependents(pkg.name)
                                   .select { |n| package_names.include?(n) }
                skip_names.each do |name|
                  skip_pkg = pending.find { |p| p.name == name }
                  if skip_pkg
                    pending.delete(skip_pkg)
                    skipped << name
                    @results << Result.new(
                      package_name: name, task: "skipped",
                      status: :dep_skipped, output: "Skipped due to failed dependency: #{pkg.name}"
                    )
                  end
                end
              end
              condition.broadcast
            end
          end
        end
      end

      if running.any? && ready.empty?
        condition.wait(mutex)
      end
    end
  end

  raise Interrupt, "Interrupted by Ctrl+C" if @interrupted

  @results
ensure
  Signal.trap("INT", previous_trap || "DEFAULT")
end

#run_task(task) ⇒ Object

Run a rake task in each package



127
128
129
130
131
# File 'lib/rwm/task_runner.rb', line 127

def run_task(task)
  run_command do |pkg|
    ["bundle", "exec", "rake", task]
  end
end

#success?Boolean

Returns:



133
134
135
# File 'lib/rwm/task_runner.rb', line 133

def success?
  @results.none? { |r| r.failed? || r.errored? }
end