Class: Rukawa::JobNet

Inherits:
AbstractJob show all
Includes:
Enumerable
Defined in:
lib/rukawa/job_net.rb

Instance Attribute Summary collapse

Attributes inherited from AbstractJob

#parent_job_net

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from AbstractJob

add_skip_rule, description, #elapsed_time_from, #formatted_elapsed_time_from, #inspect, #name, set_description, #skip?

Constructor Details

#initialize(variables: {}, context: Context.new, parent_job_net: nil, resume_job_classes: []) ⇒ JobNet

Returns a new instance of JobNet.



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/rukawa/job_net.rb', line 14

def initialize(variables: {}, context: Context.new, parent_job_net: nil, resume_job_classes: [])
  @parent_job_net = parent_job_net
  @variables = variables
  @context = context
  @dag = Dag.new
  @dag.build(self, variables, context, self.class.dependencies)
  @resume_job_classes = resume_job_classes

  unless resume_job_classes.empty?
    resume_targets = []
    @dag.tsort_each_node do |node|
      node.set_state(:bypassed)
      resume_targets << node if resume_job_classes.include?(node.class)
    end

    resume_targets.each do |node|
      @dag.each_strongly_connected_component_from(node) do |nodes|
        nodes.each { |connected| connected.set_state(:waiting) }
      end
    end
  end
end

Instance Attribute Details

#contextObject (readonly)

Returns the value of attribute context.



6
7
8
# File 'lib/rukawa/job_net.rb', line 6

def context
  @context
end

#dagObject (readonly)

Returns the value of attribute dag.



6
7
8
# File 'lib/rukawa/job_net.rb', line 6

def dag
  @dag
end

#variablesObject (readonly)

Returns the value of attribute variables.



6
7
8
# File 'lib/rukawa/job_net.rb', line 6

def variables
  @variables
end

Class Method Details

.dependenciesObject

Raises:

  • (NotImplementedError)


9
10
11
# File 'lib/rukawa/job_net.rb', line 9

def dependencies
  raise NotImplementedError, "Please override"
end

Instance Method Details

#dataflowsObject



78
79
80
# File 'lib/rukawa/job_net.rb', line 78

def dataflows
  @dag.leveled_each.map(&:dataflow)
end

#each(&block) ⇒ Object



127
128
129
# File 'lib/rukawa/job_net.rb', line 127

def each(&block)
  @dag.each(&block)
end

#executeObject



37
38
39
# File 'lib/rukawa/job_net.rb', line 37

def execute
  dataflows.each(&:execute)
end

#finished_atObject



66
67
68
# File 'lib/rukawa/job_net.rb', line 66

def finished_at
  @dag.nodes.max_by { |j| j.finished_at.to_i }.finished_at
end

#jobs_as_fromObject



123
124
125
# File 'lib/rukawa/job_net.rb', line 123

def jobs_as_from
  @dag.jobs.select { |j| j.out_goings.select { |edge| edge.cluster == self }.empty? && j.leaf? }
end

#jobs_as_toObject



119
120
121
# File 'lib/rukawa/job_net.rb', line 119

def jobs_as_to
  @dag.jobs.select { |j| j.in_comings.select { |edge| edge.cluster == self }.empty? && j.root? }
end

#output_dot(filename, format: nil) ⇒ Object



88
89
90
91
92
93
94
95
96
# File 'lib/rukawa/job_net.rb', line 88

def output_dot(filename, format: nil)
  if format && format != "dot"
    io = IO.popen(["#{Rukawa.config.dot_command}", "-T#{format}", "-o", filename], "w")
    io.write(to_dot)
    io.close
  else
    File.open(filename, 'w') { |f| f.write(to_dot) }
  end
end

#run(wait_interval = 1) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/rukawa/job_net.rb', line 41

def run(wait_interval = 1)
  promise = Concurrent::Promise.new do
    futures = execute
    until futures.all?(&:complete?)
      yield self if block_given?
      sleep wait_interval
    end
    errors = futures.map(&:reason).compact

    unless errors.empty?
      errors.each do |err|
        next if err.is_a?(DependencyUnsatisfied)
        Rukawa.logger.error(err)
      end
    end

    futures
  end
  promise.execute
end

#started_atObject



62
63
64
# File 'lib/rukawa/job_net.rb', line 62

def started_at
  @dag.nodes.min_by { |j| j.started_at ? j.started_at.to_i : Float::INFINITY }.started_at
end

#stateObject



82
83
84
85
86
# File 'lib/rukawa/job_net.rb', line 82

def state
  inject(Rukawa::State::Waiting) do |state, j|
    state.merge(j.state)
  end
end

#subgraph?Boolean

Returns:

  • (Boolean)


74
75
76
# File 'lib/rukawa/job_net.rb', line 74

def subgraph?
  !toplevel?
end

#to_dot(subgraph = false) ⇒ Object



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/rukawa/job_net.rb', line 98

def to_dot(subgraph = false)
  graphdef = subgraph ? "subgraph" : "digraph"
  buf = %Q|#{graphdef} "#{subgraph ? "cluster_" : ""}#{name}" {\n|
  buf += %Q{label = "#{graph_label}";\n}
  buf += Rukawa.config.graph.attrs
  buf += Rukawa.config.graph.node.attrs
  buf += "color = blue;\n" if subgraph
  dag.each do |j|
    buf += j.to_dot_def
  end

  dag.edges.each do |edge|
    buf += %Q|"#{edge.from.name}" -> "#{edge.to.name}";\n|
  end
  buf += "}\n"
end

#to_dot_defObject



115
116
117
# File 'lib/rukawa/job_net.rb', line 115

def to_dot_def
  to_dot(true)
end

#toplevel?Boolean

Returns:

  • (Boolean)


70
71
72
# File 'lib/rukawa/job_net.rb', line 70

def toplevel?
  @parent_job_net.nil?
end