Module: Jongleur::API

Defined in:
lib/jongleur/api.rb

Overview

Here be methods to be accessed by the gem’s client, i.e. the public API

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.task_graphObject



65
66
67
# File 'lib/jongleur/api.rb', line 65

def self.task_graph
  @@task_graph ||= {}
end

.task_matrixObject



56
57
58
# File 'lib/jongleur/api.rb', line 56

def self.task_matrix
  @@task_matrix
end

Class Method Details

.add_task_graph(task_graph_hash) ⇒ void

This method returns an undefined value.

Accepts a task_graph and does some initialisation, namely the assigning of class variables and creation of the inital task matrix

Parameters:

  • task_graph_hash (Hash<Symbol, Array>)

Raises:

  • (ArgumentError)

    if the task_matrix argument is not structured correctly



17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/jongleur/api.rb', line 17

def self.add_task_graph(task_graph_hash)
  @@task_matrix = Array.new
  raise ArgumentError, 'Value should be Hash {task_name, [descendants]}' unless task_graph_hash.is_a?(Hash)
  # this task_graph will raise the error below , { A: [:B],  B: :C,  C: []}
  task_graph_hash.values.each do |val|
    raise ArgumentError, 'Dependent Tasks should be wrapped in an Array {task_name, [dependents]}' unless val.is_a?(Array)
  end
  # this task_graph will raise the error below , { A: [:B],  B: [:C, :D],  C: []}
  if (task_graph_hash.keys.size - task_graph_hash.values.flatten.uniq.size).negative?
    raise ArgumentError, 'Each dependent Task should also be defined with a separate key entry'
  end
  @@task_graph = task_graph_hash
  @@task_matrix = Implementation.build_task_matrix(task_graph_hash)
end

.failed_tasks(my_task_matrix) ⇒ Array<Jongleur::Task>

Analyses the Task Matrix for all Tasks that failed to finish successfully

Parameters:

Returns:



83
84
85
# File 'lib/jongleur/api.rb', line 83

def self.failed_tasks(my_task_matrix)
  my_task_matrix.select { |x| x.success_status == false }
end

.get_predecessor_pids(a_task) ⇒ Object



108
109
110
111
112
113
114
# File 'lib/jongleur/api.rb', line 108

def self.get_predecessor_pids(a_task)
  pids = Array.new
  Implementation.get_predecessors(a_task).each do |task|
    pids << Implementation.get_process_id(task)
  end
  pids
end

.hung_tasks(my_task_matrix) ⇒ Array<Jongleur::Task>

Analyses the Task Matrix for all Tasks that started but failed to finish

Parameters:

Returns:

  • (Array<Jongleur::Task>)

    the Tasks that started but failed to finish



102
103
104
105
106
# File 'lib/jongleur/api.rb', line 102

def self.hung_tasks(my_task_matrix)
  my_task_matrix.select { |x| x.success_status == nil &&
    x.pid != StatusCodes::PROCESS_NOT_YET_RAN
  }
end

.not_ran_tasks(my_task_matrix) ⇒ Array<Jongleur::Task>

Analyses the Task Matrix for all Tasks that haven’t been ran

Parameters:

Returns:



91
92
93
94
95
96
# File 'lib/jongleur/api.rb', line 91

def self.not_ran_tasks(my_task_matrix)
  my_task_matrix.select { |x| x.success_status == nil &&
    x.exit_status == nil &&
    x.pid == StatusCodes::PROCESS_NOT_YET_RAN
  }
end

Prints the TaskGraph to a PDF file

Parameters:

  • the (String)

    directory name to print the file to

Returns:

  • (String)

    the PDF file name



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/jongleur/api.rb', line 36

def self.print_graph(dir="")
  graph = Graphviz::Graph.new
  dir = Dir.pwd if (!dir || dir.empty?)
  file_name = File.expand_path("jongleur_graph_#{Time.now.strftime('%m%d%Y_%H%M%S')}.pdf", dir)
  task_graph.each do |parent_node, child_nodes|
    new_node = unless graph.node_exists?(parent_node)
      graph.add_node( parent_node )
    else
      graph.get_node( parent_node ).first
    end

    child_nodes.each { |child_node| new_node.add_node(child_node) }
  end
  Graphviz::output(graph, path: file_name)
  file_name
end

.runvoid

Note:

This method launches processes without precedence constraints,

This method returns an undefined value.

The main method. It starts the tasks as separate processes, according to their precedence, traps and handles signals, processes messages. On exit it will also print the Task Matrix in the /tmp directory in JSON format

traps child process signals and starts new processes when their antecedents have finished. The method will exit its own process when all children processes have finished.

Raises:

  • (RuntimeError)

    if there are no implementations for Tasks in the Task Graph



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/jongleur/api.rb', line 126

def self.run
  unless Implementation.valid_tasks?(task_graph.keys)
    raise RuntimeError, 'Not all the tasks in the Task Graph are implemented as WorkerTask classes'
  end

  Implementation.process_message 'Starting workflow...'
  trap_quit_signals
  start_processes

  trap(:CHLD) do
    begin
      # with WNOHANG flag we make sure Process.wait is not blocking
      while (res = Process.wait2(-1, Process::WNOHANG))
        dead_pid = res[0]
        status = res[1]
        dead_task_name = ''
        Implementation.find_task_by(:pid, dead_pid) do |t|
          t.running = false
          t.exit_status = status.exitstatus
          t.success_status = status.success?
          dead_task_name = t.name
        end
        msg = "finished task: %s, process: %i, exit_status: %i, success: %s"
        Implementation.process_message msg % [dead_task_name,
                                              dead_pid,
                                              status.exitstatus,
                                              status.success?]

        if status.success?
          Implementation.run_descendants(dead_task_name)
        else
          msg = "Task #{dead_task_name} with process id #{dead_pid} was not succesfully completed."
          Implementation.process_message(msg)
        end
      end

      # it's possible for the last CHLD signal to arrive after our trap
      # handler has already called Process.wait twice and reaped the
      # available status. In such a case we must handle (and ignore)
      # the oncoming exception so we don't get a crash.
    rescue Errno::ECHILD
    end
  end

  loop do
    # We exit once all the child processes and their descendants are
    # accounted for
    if Implementation.running_tasks.empty?
      Implementation.process_message 'Workflow finished'
      file_name = File.expand_path("jongleur_task_matrix_#{Time.now.strftime('%m%d%Y_%H%M%S')}.json", '/tmp')
      File.open(file_name, 'w') {|f| f.write(task_matrix.to_json) }
      exit 0
    end
    sleep 1
  end
end

.start_processesvoid

This method returns an undefined value.

Starts all tasks without dependencies as separate processes



187
188
189
190
191
192
193
194
195
# File 'lib/jongleur/api.rb', line 187

def self.start_processes
  Implementation.tasks_without_predecessors.each do |t|
    t.running = true
    Implementation.process_message "starting task #{t.name}"
    t.pid = fork do
      Jongleur.const_get(t.name).new(predecessors: Implementation.get_predecessors(t.name)).execute
    end
  end
end

.successful_tasks(my_task_matrix) ⇒ Array<Jongleur::Task>

Analyses the Task Matrix for all Tasks that ran successfully

Parameters:

Returns:



73
74
75
76
77
# File 'lib/jongleur/api.rb', line 73

def self.successful_tasks(my_task_matrix)
  my_task_matrix.select { |x| x.success_status == true &&
    x.exit_status == 0
  }
end

.trap_quit_signalsvoid

This method returns an undefined value.

Forwards any quit signals to all working processes so that quitting the gem (Ctrl+C) kills all processes



201
202
203
204
205
206
207
208
209
210
211
# File 'lib/jongleur/api.rb', line 201

def self.trap_quit_signals
  %i[INT QUIT].each do |signal|
    Signal.trap(signal) do
      Implementation.process_message " #{signal} sent to master process!"
      Implementation.running_tasks.each do |t|
        Implementation.process_message "....killing #{t.pid}"
        Process.kill(:KILL, t.pid)
      end
    end
  end
end