Class: Elaine::Distributed::Worker

Inherits:
Object
  • Object
show all
Includes:
Celluloid, Celluloid::Logger
Defined in:
lib/elaine/distributed/worker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(coordinator_node: "elaine.coordinator", g: [], zipcodes: {}, stop_condition: Celluloid::Condition.new) ⇒ Worker

Returns a new instance of Worker.



12
13
14
15
16
17
18
19
20
21
# File 'lib/elaine/distributed/worker.rb', line 12

def initialize(coordinator_node: "elaine.coordinator", g: [], zipcodes: {}, stop_condition: Celluloid::Condition.new)

  # @coordinator_node = DCell::Node["elaine.coordinator"]
  @coordinator_node = coordinator_node
  DCell::Node[@coordinator_node][:coordinator].register_worker DCell.me.id

  @vertices = []
  @superstep_num = 0
  @stop_condition = stop_condition
end

Instance Attribute Details

#activeObject (readonly)

Returns the value of attribute active.



9
10
11
# File 'lib/elaine/distributed/worker.rb', line 9

def active
  @active
end

#verticesObject (readonly)

Returns the value of attribute vertices.



9
10
11
# File 'lib/elaine/distributed/worker.rb', line 9

def vertices
  @vertices
end

#vertices2Object (readonly)

Returns the value of attribute vertices2.



9
10
11
# File 'lib/elaine/distributed/worker.rb', line 9

def vertices2
  @vertices2
end

Instance Method Details

#init_graph(g = []) ⇒ Object



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/elaine/distributed/worker.rb', line 23

def init_graph(g=[])
  raise 'empty worker graph' if g.empty?
  if @vertices.size > 0
    @vertices.each do |v|
      # Celluloid::Actor[v].terminate
    end
  end
  @vertices = []
  # raise "Graph already initialized!" if @vertices.size > 0

  # we are going to assume that graphs come in as json documents
  # *describing* the graph.
  # @vertices = graph
  # @active   = graph.size

  # HACK the local vertices should be dealt with differently than
  # the @vertices2 member
  @vertices2 = []
  g.each do |n|
    # n[:klazz].supervise_as n[:id], n[:id], n[:value], Celluloid::Actor[:postoffice], n[:outedges]
    @vertices << n[:id]
    v = n[:klazz].new n[:id], n[:value], Celluloid::Actor[:postoffice], n[:outedges]
    @vertices2 << v
  end
  @active = @vertices.size

  debug "There are #{@vertices.size} vertices in this worker."

end

#init_superstepObject

HACK this should be handled better…



54
55
56
57
58
59
# File 'lib/elaine/distributed/worker.rb', line 54

def init_superstep
  @vertices2.each do |v|
    v.messages = Celluloid::Actor[:postoffice].read_all(v.id)
  end
  debug "#{DCell.me.id} finished init_superstep"
end

#pmap(enum, &block) ⇒ Object



61
62
63
64
# File 'lib/elaine/distributed/worker.rb', line 61

def pmap(enum, &block)
  futures = enum.map { |elem| Celluloid::Future.new(elem, &block) }
  futures.map { |future| future.value }
end

#stopObject



66
67
68
# File 'lib/elaine/distributed/worker.rb', line 66

def stop
  @stop_condition.signal(true)
end

#superstepObject



70
71
72
73
74
75
76
77
78
# File 'lib/elaine/distributed/worker.rb', line 70

def superstep
  active = @vertices2.select {|v| v.active?}

  
  pmap(active) do |v|
    v.step
  end
  @active = active.select {|v| v.active?}.size    
end

#vertex_valuesObject



80
81
82
# File 'lib/elaine/distributed/worker.rb', line 80

def vertex_values
  @vertices2.map { |v| {id: v.id, value: v.value} }
end