Class: Elaine::Distributed::Coordinator
- Inherits:
-
Object
- Object
- Elaine::Distributed::Coordinator
- Includes:
- Celluloid, Celluloid::Logger
- Defined in:
- lib/elaine/distributed/coordinator.rb
Instance Attribute Summary collapse
-
#num_partitions ⇒ Object
readonly
Returns the value of attribute num_partitions.
-
#partitions ⇒ Object
readonly
Returns the value of attribute partitions.
-
#workers ⇒ Object
readonly
finalizer :shutdown.
Instance Method Summary collapse
- #graph=(g) ⇒ Object
-
#initialize(graph: nil, num_partitions: 1, stop_condition: Celluloid::Condition.new) ⇒ Coordinator
constructor
A new instance of Coordinator.
- #partition ⇒ Object
- #register_worker(worker_node) ⇒ Object
- #run_and_stop ⇒ Object
- #run_job ⇒ Object
- #run_until_finished ⇒ Object
- #stop ⇒ Object
- #vertex_values(&block) ⇒ Object
- #zipcodes ⇒ Object
Constructor Details
#initialize(graph: nil, num_partitions: 1, stop_condition: Celluloid::Condition.new) ⇒ Coordinator
Returns a new instance of Coordinator.
16 17 18 19 20 21 22 23 |
# File 'lib/elaine/distributed/coordinator.rb', line 16 def initialize(graph: nil, num_partitions: 1, stop_condition: Celluloid::Condition.new) @workers = [] @num_partitions = num_partitions @graph = graph info "GOT GRAPH: #{graph}" @partitions = Hash.new @stop_condition = stop_condition end |
Instance Attribute Details
#num_partitions ⇒ Object (readonly)
Returns the value of attribute num_partitions.
14 15 16 |
# File 'lib/elaine/distributed/coordinator.rb', line 14 def num_partitions @num_partitions end |
#partitions ⇒ Object (readonly)
Returns the value of attribute partitions.
13 14 15 |
# File 'lib/elaine/distributed/coordinator.rb', line 13 def partitions @partitions end |
#workers ⇒ Object (readonly)
finalizer :shutdown
12 13 14 |
# File 'lib/elaine/distributed/coordinator.rb', line 12 def workers @workers end |
Instance Method Details
#graph=(g) ⇒ Object
25 26 27 28 29 |
# File 'lib/elaine/distributed/coordinator.rb', line 25 def graph=(g) debug "Setting graph" @graph = g debug "done setting graph" end |
#partition ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/elaine/distributed/coordinator.rb', line 41 def partition # not sure if we should re-initialize or not @partitions = Hash.new size = (@graph.size.to_f / workers.size).ceil @graph.each_slice(size).with_index do |slice, index| @partitions[@workers[index]] = slice end @partitions end |
#register_worker(worker_node) ⇒ Object
54 55 56 57 58 59 60 61 62 63 |
# File 'lib/elaine/distributed/coordinator.rb', line 54 def register_worker(worker_node) # we could, in theory, have multiple workers in the same node, however # i think it makes more sense to just have multiple nodes running on the # same machine instead of multiple workers in a single node # This should be re-evaluated at some point in the future. info "Registering worker: #{worker_node}" unless @workers.include? worker_node @workers << worker_node end end |
#run_and_stop ⇒ Object
120 121 122 123 |
# File 'lib/elaine/distributed/coordinator.rb', line 120 def run_and_stop run_until_finished @stop_condition.signal(true) end |
#run_job ⇒ Object
109 110 111 |
# File 'lib/elaine/distributed/coordinator.rb', line 109 def run_job run_until_finished end |
#run_until_finished ⇒ Object
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/elaine/distributed/coordinator.rb', line 65 def run_until_finished # zipcodes = {} debug "partitioning" partition # debug "Partitions: #{@partitions}" # distribute the zipcodes debug "building zipcodes" zips = zipcodes debug "distributing zipcodes" @workers.each do |worker_node| DCell::Node[worker_node][:postoffice].zipcodes = zips end # now send the graph debug "distributing graph" @partitions.each_pair do |worker_node, vertices| DCell::Node[worker_node][:worker].init_graph vertices end debug "Running job" step_num = 0 loop do step_num += 1 # execute a superstep and wait for workers to complete debug "Initializing superstep #{step_num}" step = @workers.select do |w| DCell::Node[w][:worker].active > 0 end.map {|w| DCell::Node[w][:worker].future(:init_superstep)} step.map { |f| f.value } debug "Running superstep #{step_num}" step = @workers.select do |w| DCell::Node[w][:worker].active > 0 end.map {|w| DCell::Node[w][:worker].future(:superstep)} step.map { |f| f.value } break if @workers.select { |w| DCell::Node[w][:worker].active > 0 }.size.zero? end debug "Job finished!" end |
#stop ⇒ Object
113 114 115 116 117 118 |
# File 'lib/elaine/distributed/coordinator.rb', line 113 def stop @workers.each do |w| DCell::Node[w][:worker].async.stop end @stop_condition.signal(true) end |
#vertex_values(&block) ⇒ Object
125 126 127 128 129 130 |
# File 'lib/elaine/distributed/coordinator.rb', line 125 def vertex_values(&block) @workers.map do |w| worker_node = DCell::Node[w] worker_node[:worker].vertex_values end.flatten end |
#zipcodes ⇒ Object
31 32 33 34 35 36 37 38 39 |
# File 'lib/elaine/distributed/coordinator.rb', line 31 def zipcodes zips = {} @partitions.each_pair do |zip, vertices| vertices.each do |vertex| zips[vertex[:id]] = zip end end zips end |