Module: Jongleur::API

Includes:
Hollerback
Defined in:
lib/jongleur/api.rb

Overview

Here be methods to be accessed by the gem’s client, i.e. the public API

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Attribute Details

#task_graphHash<Symbol, Array<Symbol>>

Returns where the Hash key is the Task name and the value is an array of dependent Tasks.

Examples:

a_task_graph = {:A=>[:B, :C], :B=>[:D], :C=>[:D], :D=>[:E], :E=>[]}

Returns:

  • (Hash<Symbol, Array<Symbol>>)

    where the Hash key is the Task name and the value is an array of dependent Tasks



66
67
68
# File 'lib/jongleur/api.rb', line 66

def self.task_graph
  @@task_graph ||= {}
end

#task_matrixArray<Jongleur::Task>

Returns a list of Tasks and their current state.

Returns:

See Also:



57
58
59
# File 'lib/jongleur/api.rb', line 57

def self.task_matrix
  @@task_matrix || []
end

Class Method Details

.add_task_graph(task_graph_hash) ⇒ void

This method returns an undefined value.

Accepts a task_graph and does some initialisation, namely the assigning of class variables and creation of the inital task matrix

Parameters:

  • task_graph_hash (Hash<Symbol, Array>)

Raises:

  • (ArgumentError)

    if the task_matrix argument is not structured correctly



18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/jongleur/api.rb', line 18

def self.add_task_graph(task_graph_hash)
  @@task_matrix = Array.new
  raise ArgumentError, 'Value should be Hash {task_name, [descendants]}' unless task_graph_hash.is_a?(Hash)
  # this task_graph will raise the error below , { A: [:B],  B: :C,  C: []}
  task_graph_hash.values.each do |val|
    raise ArgumentError, 'Dependent Tasks should be wrapped in an Array {task_name, [dependents]}' unless val.is_a?(Array)
  end
  # this task_graph will raise the error below , { A: [:B],  B: [:C, :D],  C: []}
  if (task_graph_hash.keys.size - task_graph_hash.values.flatten.uniq.size).negative?
    raise ArgumentError, 'Each dependent Task should also be defined with a separate key entry'
  end
  @@task_graph = task_graph_hash
  @@task_matrix = Implementation.build_task_matrix(task_graph_hash)
end

.failed_tasks(my_task_matrix) ⇒ Array<Jongleur::Task>

Analyses the Task Matrix for all Tasks that failed to finish successfully

Parameters:

Returns:



84
85
86
# File 'lib/jongleur/api.rb', line 84

def self.failed_tasks(my_task_matrix)
  my_task_matrix.select { |x| x.success_status == false }
end

.get_predecessor_pids(a_task) ⇒ Object



109
110
111
112
113
114
115
# File 'lib/jongleur/api.rb', line 109

def self.get_predecessor_pids(a_task)
  pids = Array.new
  Implementation.get_predecessors(a_task).each do |task|
    pids << Implementation.get_process_id(task)
  end
  pids
end

.hung_tasks(my_task_matrix) ⇒ Array<Jongleur::Task>

Analyses the Task Matrix for all Tasks that started but failed to finish

Parameters:

Returns:

  • (Array<Jongleur::Task>)

    the Tasks that started but failed to finish



103
104
105
106
107
# File 'lib/jongleur/api.rb', line 103

def self.hung_tasks(my_task_matrix)
  my_task_matrix.select { |x| x.success_status == nil &&
    x.pid != StatusCodes::PROCESS_NOT_YET_RAN
  }
end

.not_ran_tasks(my_task_matrix) ⇒ Array<Jongleur::Task>

Analyses the Task Matrix for all Tasks that haven’t been ran

Parameters:

Returns:



92
93
94
95
96
97
# File 'lib/jongleur/api.rb', line 92

def self.not_ran_tasks(my_task_matrix)
  my_task_matrix.select { |x| x.success_status == nil &&
    x.exit_status == nil &&
    x.pid == StatusCodes::PROCESS_NOT_YET_RAN
  }
end

Prints the TaskGraph to a PDF file

Parameters:

  • the (String)

    directory name to print the file to

Returns:

  • (String)

    the PDF file name



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/jongleur/api.rb', line 37

def self.print_graph(dir="")
  graph = Graphviz::Graph.new
  dir = Dir.pwd if (!dir || dir.empty?)
  file_name = File.expand_path("jongleur_graph_#{Time.now.strftime('%m%d%Y_%H%M%S')}.pdf", dir)
  task_graph.each do |parent_node, child_nodes|
    new_node = unless graph.node_exists?(parent_node)
      graph.add_node( parent_node )
    else
      graph.get_node( parent_node ).first
    end

    child_nodes.each { |child_node| new_node.add_node(child_node) }
  end
  Graphviz::output(graph, path: file_name)
  file_name
end

.run(&block) ⇒ void

Note:

This method launches processes without precedence constraints,

This method returns an undefined value.

The main method. It starts the tasks as separate processes, according to their precedence, traps and handles signals, processes messages. On exit it will also print the Task Matrix in the /tmp directory in JSON format

traps child process signals and starts new processes when their antecedents have finished. The method will exit its own process when all children processes have finished.

Raises:

  • (RuntimeError)

    if there are no implementations for Tasks in the Task Graph



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/jongleur/api.rb', line 127

def self.run(&block)
  unless Implementation.valid_tasks?(task_graph.keys)
    raise RuntimeError, 'Not all the tasks in the Task Graph are implemented as WorkerTask classes'
  end

  Implementation.process_message 'Starting workflow...'
  trap_quit_signals
  @finished_tasks_queue = []
  start_processes

  trap(:CHLD) do
    begin
      # with WNOHANG flag we make sure Process.wait is not blocking
      while (res = Process.wait2(-1, Process::WNOHANG))
        dead_pid, status = res
        finish_time = Time.now.to_f
        dead_task_name = ''
        Implementation.find_task_by(:pid, dead_pid) do |t|
          t.running = false
          t.exit_status = status.exitstatus
          t.success_status = status.success?
          t.finish_time = finish_time
          dead_task_name = t.name
          @finished_tasks_queue << { name: dead_task_name, done: status.success?, pid: dead_pid}
        end
        msg = "finished task: %s, process: %i, exit_status: %i, success: %s"
        Implementation.process_message msg % [dead_task_name,
                                              dead_pid,
                                              status.exitstatus,
                                              status.success?]


      end

      # it's possible for the last CHLD signal to arrive after our trap
      # handler has already called Process.wait twice and reaped the
      # available status. In such a case we must handle (and ignore)
      # the oncoming exception so we don't get a crash.
    rescue Errno::ECHILD
    end
  end #trap

  loop do
    # run task's descendants as soon as task appears on 'finished' queue
    while task = @finished_tasks_queue.pop
      if task[:done]
        Implementation.run_descendants(task[:name])
      else
        msg = "Task #{task[:name]} with process id #{task[:pid]} was not succesfully completed."
        Implementation.process_message(msg)
      end
    end

    # We exit once all the child processes and their descendants are accounted for
    if Implementation.running_tasks.empty?
      Implementation.process_message 'Workflow finished'
      file_name = File.expand_path("jongleur_task_matrix_#{Time.now.strftime('%m%d%Y_%H%M%S')}.json", '/tmp')
      File.open(file_name, 'w') {|f| f.write(task_matrix.to_json) }
      hollerback_for(block) { |cb| cb.respond_with(:completed , task_matrix) } if block_given?
      exit 0
    end
    sleep 1
  end
end

.start_processesvoid

This method returns an undefined value.

Starts all tasks without dependencies as separate processes



196
197
198
199
200
201
202
203
204
# File 'lib/jongleur/api.rb', line 196

def self.start_processes
  Implementation.tasks_without_predecessors.each do |t|
    t.running = true
    Implementation.process_message "starting task #{t.name}"
    t.pid = fork do
      Jongleur.const_get(t.name).new(predecessors: Implementation.get_predecessors(t.name)).execute
    end
  end
end

.successful_tasks(my_task_matrix) ⇒ Array<Jongleur::Task>

Analyses the Task Matrix for all Tasks that ran successfully

Parameters:

Returns:



74
75
76
77
78
# File 'lib/jongleur/api.rb', line 74

def self.successful_tasks(my_task_matrix)
  my_task_matrix.select { |x| x.success_status == true &&
    x.exit_status == 0
  }
end

.task_graphObject



66
67
68
# File 'lib/jongleur/api.rb', line 66

def self.task_graph
  @@task_graph ||= {}
end

.task_matrixObject



57
58
59
# File 'lib/jongleur/api.rb', line 57

def self.task_matrix
  @@task_matrix || []
end

.trap_quit_signalsvoid

This method returns an undefined value.

Forwards any quit signals to all working processes so that quitting the gem (Ctrl+C) kills all processes



210
211
212
213
214
215
216
217
218
219
220
# File 'lib/jongleur/api.rb', line 210

def self.trap_quit_signals
  i[INT QUIT].each do |signal|
    Signal.trap(signal) do
      Implementation.process_message " #{signal} sent to master process!"
      Implementation.running_tasks.each do |t|
        Implementation.process_message "....killing #{t.pid}"
        Process.kill(:KILL, t.pid)
      end
    end
  end
end