Class: Rukawa::JobNet
Instance Attribute Summary collapse
Attributes inherited from AbstractJob
#parent_job_net
Class Method Summary
collapse
Instance Method Summary
collapse
Methods inherited from AbstractJob
add_skip_rule, description, #elapsed_time_from, #formatted_elapsed_time_from, #inspect, #name, set_description, #skip?
Constructor Details
#initialize(parent_job_net, variables, *resume_job_classes) ⇒ JobNet
Returns a new instance of JobNet.
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
# File 'lib/rukawa/job_net.rb', line 14
def initialize(parent_job_net, variables, *resume_job_classes)
@parent_job_net = parent_job_net
@dag = Dag.new
@dag.build(self, variables, self.class.dependencies)
@resume_job_classes = resume_job_classes
unless resume_job_classes.empty?
resume_targets = []
@dag.tsort_each_node do |node|
node.set_state(:bypassed)
resume_targets << node if resume_job_classes.include?(node.class)
end
resume_targets.each do |node|
@dag.each_strongly_connected_component_from(node) do |nodes|
nodes.each { |connected| connected.set_state(:waiting) }
end
end
end
end
|
Instance Attribute Details
#dag ⇒ Object
Returns the value of attribute dag.
6
7
8
|
# File 'lib/rukawa/job_net.rb', line 6
def dag
@dag
end
|
Class Method Details
.dependencies ⇒ Object
9
10
11
|
# File 'lib/rukawa/job_net.rb', line 9
def dependencies
raise NotImplementedError, "Please override"
end
|
Instance Method Details
#dataflows ⇒ Object
51
52
53
|
# File 'lib/rukawa/job_net.rb', line 51
def dataflows
@dag.leveled_each.map(&:dataflow)
end
|
#each(&block) ⇒ Object
100
101
102
|
# File 'lib/rukawa/job_net.rb', line 100
def each(&block)
@dag.each(&block)
end
|
#finished_at ⇒ Object
39
40
41
|
# File 'lib/rukawa/job_net.rb', line 39
def finished_at
@dag.nodes.max_by { |j| j.finished_at.to_i }.finished_at
end
|
#jobs_as_from ⇒ Object
96
97
98
|
# File 'lib/rukawa/job_net.rb', line 96
def jobs_as_from
@dag.jobs.select { |j| j.out_goings.select { |edge| edge.cluster == self }.empty? && j.leaf? }
end
|
#jobs_as_to ⇒ Object
92
93
94
|
# File 'lib/rukawa/job_net.rb', line 92
def jobs_as_to
@dag.jobs.select { |j| j.in_comings.select { |edge| edge.cluster == self }.empty? && j.root? }
end
|
#output_dot(filename, format: nil) ⇒ Object
61
62
63
64
65
66
67
68
69
|
# File 'lib/rukawa/job_net.rb', line 61
def output_dot(filename, format: nil)
if format && format != "dot"
io = IO.popen(["#{Rukawa.config.dot_command}", "-T#{format}", "-o", filename], "w")
io.write(to_dot)
io.close
else
File.open(filename, 'w') { |f| f.write(to_dot) }
end
end
|
#started_at ⇒ Object
35
36
37
|
# File 'lib/rukawa/job_net.rb', line 35
def started_at
@dag.nodes.min_by { |j| j.started_at ? j.started_at.to_i : Float::INFINITY }.started_at
end
|
#state ⇒ Object
55
56
57
58
59
|
# File 'lib/rukawa/job_net.rb', line 55
def state
inject(Rukawa::State::Waiting) do |state, j|
state.merge(j.state)
end
end
|
#subgraph? ⇒ Boolean
47
48
49
|
# File 'lib/rukawa/job_net.rb', line 47
def subgraph?
!toplevel?
end
|
#to_dot(subgraph = false) ⇒ Object
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
# File 'lib/rukawa/job_net.rb', line 71
def to_dot(subgraph = false)
graphdef = subgraph ? "subgraph" : "digraph"
buf = %Q|#{graphdef} "#{subgraph ? "cluster_" : ""}#{name}" {\n|
buf += %Q{label = "#{graph_label}";\n}
buf += Rukawa.config.graph.attrs
buf += Rukawa.config.graph.node.attrs
buf += "color = blue;\n" if subgraph
dag.each do |j|
buf += j.to_dot_def
end
dag.edges.each do |edge|
buf += %Q|"#{edge.from.name}" -> "#{edge.to.name}";\n|
end
buf += "}\n"
end
|
#to_dot_def ⇒ Object
88
89
90
|
# File 'lib/rukawa/job_net.rb', line 88
def to_dot_def
to_dot(true)
end
|
#toplevel? ⇒ Boolean
43
44
45
|
# File 'lib/rukawa/job_net.rb', line 43
def toplevel?
@parent_job_net.nil?
end
|