Module: Jongleur::API
- Includes:
- Hollerback
- Defined in:
- lib/jongleur/api.rb
Overview
Here be methods to be accessed by the gem’s client, i.e. the public API
Instance Attribute Summary collapse
-
#task_graph ⇒ Hash<Symbol, Array<Symbol>>
Where the Hash key is the Task name and the value is an array of dependent Tasks.
-
#task_matrix ⇒ Array<Jongleur::Task>
A list of Tasks and their current state.
Class Method Summary collapse
-
.add_task_graph(task_graph_hash) ⇒ void
Accepts a task_graph and does some initialisation, namely the assigning of class variables and creation of the inital task matrix.
-
.failed_tasks(my_task_matrix) ⇒ Array<Jongleur::Task>
Analyses the Task Matrix for all Tasks that failed to finish successfully.
- .get_predecessor_pids(a_task) ⇒ Object
-
.hung_tasks(my_task_matrix) ⇒ Array<Jongleur::Task>
Analyses the Task Matrix for all Tasks that started but failed to finish.
-
.not_ran_tasks(my_task_matrix) ⇒ Array<Jongleur::Task>
Analyses the Task Matrix for all Tasks that haven’t been ran.
-
.print_graph(dir = "") ⇒ String
Prints the TaskGraph to a PDF file.
-
.run(&block) ⇒ void
The main method.
-
.start_processes ⇒ void
Starts all tasks without dependencies as separate processes.
-
.successful_tasks(my_task_matrix) ⇒ Array<Jongleur::Task>
Analyses the Task Matrix for all Tasks that ran successfully.
- .task_graph ⇒ Object
- .task_matrix ⇒ Object
-
.trap_quit_signals ⇒ void
Forwards any quit signals to all working processes so that quitting the gem (Ctrl+C) kills all processes.
Instance Attribute Details
#task_graph ⇒ Hash<Symbol, Array<Symbol>>
Returns where the Hash key is the Task name and the value is an array of dependent Tasks.
66 67 68 |
# File 'lib/jongleur/api.rb', line 66 def self.task_graph @@task_graph ||= {} end |
#task_matrix ⇒ Array<Jongleur::Task>
Returns a list of Tasks and their current state.
57 58 59 |
# File 'lib/jongleur/api.rb', line 57 def self.task_matrix @@task_matrix || [] end |
Class Method Details
.add_task_graph(task_graph_hash) ⇒ void
This method returns an undefined value.
Accepts a task_graph and does some initialisation, namely the assigning of class variables and creation of the inital task matrix
18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/jongleur/api.rb', line 18 def self.add_task_graph(task_graph_hash) @@task_matrix = Array.new raise ArgumentError, 'Value should be Hash {task_name, [descendants]}' unless task_graph_hash.is_a?(Hash) # this task_graph will raise the error below , { A: [:B], B: :C, C: []} task_graph_hash.values.each do |val| raise ArgumentError, 'Dependent Tasks should be wrapped in an Array {task_name, [dependents]}' unless val.is_a?(Array) end # this task_graph will raise the error below , { A: [:B], B: [:C, :D], C: []} if (task_graph_hash.keys.size - task_graph_hash.values.flatten.uniq.size).negative? raise ArgumentError, 'Each dependent Task should also be defined with a separate key entry' end @@task_graph = task_graph_hash @@task_matrix = Implementation.build_task_matrix(task_graph_hash) end |
.failed_tasks(my_task_matrix) ⇒ Array<Jongleur::Task>
Analyses the Task Matrix for all Tasks that failed to finish successfully
84 85 86 |
# File 'lib/jongleur/api.rb', line 84 def self.failed_tasks(my_task_matrix) my_task_matrix.select { |x| x.success_status == false } end |
.get_predecessor_pids(a_task) ⇒ Object
109 110 111 112 113 114 115 |
# File 'lib/jongleur/api.rb', line 109 def self.get_predecessor_pids(a_task) pids = Array.new Implementation.get_predecessors(a_task).each do |task| pids << Implementation.get_process_id(task) end pids end |
.hung_tasks(my_task_matrix) ⇒ Array<Jongleur::Task>
Analyses the Task Matrix for all Tasks that started but failed to finish
103 104 105 106 107 |
# File 'lib/jongleur/api.rb', line 103 def self.hung_tasks(my_task_matrix) my_task_matrix.select { |x| x.success_status == nil && x.pid != StatusCodes::PROCESS_NOT_YET_RAN } end |
.not_ran_tasks(my_task_matrix) ⇒ Array<Jongleur::Task>
Analyses the Task Matrix for all Tasks that haven’t been ran
92 93 94 95 96 97 |
# File 'lib/jongleur/api.rb', line 92 def self.not_ran_tasks(my_task_matrix) my_task_matrix.select { |x| x.success_status == nil && x.exit_status == nil && x.pid == StatusCodes::PROCESS_NOT_YET_RAN } end |
.print_graph(dir = "") ⇒ String
Prints the TaskGraph to a PDF file
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/jongleur/api.rb', line 37 def self.print_graph(dir="") graph = Graphviz::Graph.new dir = Dir.pwd if (!dir || dir.empty?) file_name = File.("jongleur_graph_#{Time.now.strftime('%m%d%Y_%H%M%S')}.pdf", dir) task_graph.each do |parent_node, child_nodes| new_node = unless graph.node_exists?(parent_node) graph.add_node( parent_node ) else graph.get_node( parent_node ).first end child_nodes.each { |child_node| new_node.add_node(child_node) } end Graphviz::output(graph, path: file_name) file_name end |
.run(&block) ⇒ void
This method launches processes without precedence constraints,
This method returns an undefined value.
The main method. It starts the tasks as separate processes, according to their precedence, traps and handles signals, processes messages. On exit it will also print the Task Matrix in the /tmp directory in JSON format
traps child process signals and starts new processes when their antecedents have finished. The method will exit its own process when all children processes have finished.
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
# File 'lib/jongleur/api.rb', line 127 def self.run(&block) unless Implementation.valid_tasks?(task_graph.keys) raise RuntimeError, 'Not all the tasks in the Task Graph are implemented as WorkerTask classes' end Implementation. 'Starting workflow...' trap_quit_signals @finished_tasks_queue = [] start_processes trap(:CHLD) do begin # with WNOHANG flag we make sure Process.wait is not blocking while (res = Process.wait2(-1, Process::WNOHANG)) dead_pid, status = res finish_time = Time.now.to_f dead_task_name = '' Implementation.find_task_by(:pid, dead_pid) do |t| t.running = false t.exit_status = status.exitstatus t.success_status = status.success? t.finish_time = finish_time dead_task_name = t.name @finished_tasks_queue << { name: dead_task_name, done: status.success?, pid: dead_pid} end msg = "finished task: %s, process: %i, exit_status: %i, success: %s" Implementation. msg % [dead_task_name, dead_pid, status.exitstatus, status.success?] end # it's possible for the last CHLD signal to arrive after our trap # handler has already called Process.wait twice and reaped the # available status. In such a case we must handle (and ignore) # the oncoming exception so we don't get a crash. rescue Errno::ECHILD end end #trap loop do # run task's descendants as soon as task appears on 'finished' queue while task = @finished_tasks_queue.pop if task[:done] Implementation.run_descendants(task[:name]) else msg = "Task #{task[:name]} with process id #{task[:pid]} was not succesfully completed." Implementation.(msg) end end # We exit once all the child processes and their descendants are accounted for if Implementation.running_tasks.empty? Implementation. 'Workflow finished' file_name = File.("jongleur_task_matrix_#{Time.now.strftime('%m%d%Y_%H%M%S')}.json", '/tmp') File.open(file_name, 'w') {|f| f.write(task_matrix.to_json) } hollerback_for(block) { |cb| cb.respond_with(:completed , task_matrix) } if block_given? exit 0 end sleep 1 end end |
.start_processes ⇒ void
This method returns an undefined value.
Starts all tasks without dependencies as separate processes
196 197 198 199 200 201 202 203 204 |
# File 'lib/jongleur/api.rb', line 196 def self.start_processes Implementation.tasks_without_predecessors.each do |t| t.running = true Implementation. "starting task #{t.name}" t.pid = fork do Jongleur.const_get(t.name).new(predecessors: Implementation.get_predecessors(t.name)).execute end end end |
.successful_tasks(my_task_matrix) ⇒ Array<Jongleur::Task>
Analyses the Task Matrix for all Tasks that ran successfully
74 75 76 77 78 |
# File 'lib/jongleur/api.rb', line 74 def self.successful_tasks(my_task_matrix) my_task_matrix.select { |x| x.success_status == true && x.exit_status == 0 } end |
.task_graph ⇒ Object
66 67 68 |
# File 'lib/jongleur/api.rb', line 66 def self.task_graph @@task_graph ||= {} end |
.task_matrix ⇒ Object
57 58 59 |
# File 'lib/jongleur/api.rb', line 57 def self.task_matrix @@task_matrix || [] end |
.trap_quit_signals ⇒ void
This method returns an undefined value.
Forwards any quit signals to all working processes so that quitting the gem (Ctrl+C) kills all processes
210 211 212 213 214 215 216 217 218 219 220 |
# File 'lib/jongleur/api.rb', line 210 def self.trap_quit_signals i[INT QUIT].each do |signal| Signal.trap(signal) do Implementation. " #{signal} sent to master process!" Implementation.running_tasks.each do |t| Implementation. "....killing #{t.pid}" Process.kill(:KILL, t.pid) end end end end |