Class: Elaine::Distributed::Coordinator

Inherits:
Object
  • Object
show all
Includes:
Celluloid, Celluloid::Logger
Defined in:
lib/elaine/distributed/coordinator.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(graph: nil, num_partitions: 1, stop_condition: Celluloid::Condition.new) ⇒ Coordinator

Returns a new instance of Coordinator.



16
17
18
19
20
21
22
23
# File 'lib/elaine/distributed/coordinator.rb', line 16

def initialize(graph: nil, num_partitions: 1, stop_condition: Celluloid::Condition.new)
  @workers = []
  @num_partitions = num_partitions
  @graph = graph
  info "GOT GRAPH: #{graph}"
  @partitions = Hash.new
  @stop_condition = stop_condition
end

Instance Attribute Details

#num_partitionsObject (readonly)

Returns the value of attribute num_partitions.



14
15
16
# File 'lib/elaine/distributed/coordinator.rb', line 14

def num_partitions
  @num_partitions
end

#partitionsObject (readonly)

Returns the value of attribute partitions.



13
14
15
# File 'lib/elaine/distributed/coordinator.rb', line 13

def partitions
  @partitions
end

#workersObject (readonly)

finalizer :shutdown



12
13
14
# File 'lib/elaine/distributed/coordinator.rb', line 12

def workers
  @workers
end

Instance Method Details

#graph=(g) ⇒ Object



25
26
27
28
29
# File 'lib/elaine/distributed/coordinator.rb', line 25

def graph=(g)
  debug "Setting graph"
  @graph = g
  debug "done setting graph"
end

#partitionObject



41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/elaine/distributed/coordinator.rb', line 41

def partition
  # not sure if we should re-initialize or not
  @partitions = Hash.new

  size = (@graph.size.to_f / workers.size).ceil

  @graph.each_slice(size).with_index do |slice, index|
    @partitions[@workers[index]] = slice
  end
  
  @partitions
end

#register_worker(worker_node) ⇒ Object



54
55
56
57
58
59
60
61
62
63
# File 'lib/elaine/distributed/coordinator.rb', line 54

def register_worker(worker_node)
  # we could, in theory, have multiple workers in the same node, however
  # i think it makes more sense to just have multiple nodes running on the
  # same machine instead of multiple workers in a single node
  # This should be re-evaluated at some point in the future.
  info "Registering worker: #{worker_node}"
  unless @workers.include? worker_node
    @workers << worker_node
  end
end

#run_and_stopObject



120
121
122
123
# File 'lib/elaine/distributed/coordinator.rb', line 120

def run_and_stop
  run_until_finished
  @stop_condition.signal(true)
end

#run_jobObject



109
110
111
# File 'lib/elaine/distributed/coordinator.rb', line 109

def run_job
  run_until_finished
end

#run_until_finishedObject



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/elaine/distributed/coordinator.rb', line 65

def run_until_finished
  # zipcodes = {}
  debug "partitioning"
  partition
  # debug "Partitions: #{@partitions}"

  # distribute the zipcodes
  debug "building zipcodes"
  zips = zipcodes
  debug "distributing zipcodes"
  @workers.each do |worker_node|
    DCell::Node[worker_node][:postoffice].zipcodes = zips
  end

  # now send the graph
  debug "distributing graph"
  @partitions.each_pair do |worker_node, vertices|
    DCell::Node[worker_node][:worker].init_graph vertices
  end


  debug "Running job"
  step_num = 0
  loop do
    step_num += 1
    # execute a superstep and wait for workers to complete
    debug "Initializing superstep #{step_num}"
    step = @workers.select do |w|
      DCell::Node[w][:worker].active > 0
    end.map {|w| DCell::Node[w][:worker].future(:init_superstep)}
    step.map { |f| f.value }

    debug "Running superstep #{step_num}"
    step = @workers.select do |w|
      DCell::Node[w][:worker].active > 0
    end.map {|w| DCell::Node[w][:worker].future(:superstep)}

    step.map { |f| f.value }

    break if @workers.select { |w| DCell::Node[w][:worker].active > 0 }.size.zero?
  end
  debug "Job finished!"
end

#stopObject



113
114
115
116
117
118
# File 'lib/elaine/distributed/coordinator.rb', line 113

def stop
  @workers.each do |w|
    DCell::Node[w][:worker].async.stop
  end
  @stop_condition.signal(true)
end

#vertex_values(&block) ⇒ Object



125
126
127
128
129
130
# File 'lib/elaine/distributed/coordinator.rb', line 125

def vertex_values(&block)
  @workers.map do |w|
    worker_node = DCell::Node[w]
    worker_node[:worker].vertex_values
  end.flatten
end

#zipcodesObject



31
32
33
34
35
36
37
38
39
# File 'lib/elaine/distributed/coordinator.rb', line 31

def zipcodes
  zips = {}
  @partitions.each_pair do |zip, vertices|
    vertices.each do |vertex|
      zips[vertex[:id]] = zip
    end
  end
  zips
end