Class: Tem::Mr::Search::MapReducePlanner

Inherits:
Object
  • Object
show all
Defined in:
lib/tem_mr_search/map_reduce_planner.rb

Overview

Allocates the individual components of a Map-Reduce job across TEMs.

This class is instantiated and used by MapReduceExecutor. It should not be used directly in client code, except for the purpose of replacing the default planner.

The Map-Reduce coordinator calls next_actions! on the planner, to obtain a list of actions that can be carried out. The planner guarantees that the actions are independent of each other, and that all their dependencies are satisfied. When the coordinator learns about the completion of some actions, it updates the planner’s state by calling action_done. After action_done is called, new_action should be called again to obtain new actions that can be carried out.

Partial results (outputs) in the Map-Reduce computation are identified by unique numbers starting from 0. The output IDs can be used as file names, if the outputs are stored in a distributed file system. When the computation is done (calling done? returns true), the output_id attribute will contain the ID of the computation’s final result.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(job, num_items, num_tems, root_tems) ⇒ MapReducePlanner

Creates a planner for a Map-Reduce job.

Arguments:

job:: the Map-Reduce job (see Tem::Mr::Search::MapReduceJob)
num_items: how many data items does the Map-Reduce run over
num_tems:: how many TEMs are available
root_tems:: the indexes of the TEMs that have the initial SECpacks bound
            to them (hash with the keys +:mapper+, +:reducer+ and
            +:finalizer+)


42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/tem_mr_search/map_reduce_planner.rb', line 42

def initialize(job, num_items, num_tems, root_tems)
  @job = job
  @root_tems = root_tems
  
  @without = { :mapper => RBTree.new, :reducer => RBTree.new }
  @with = { :mapper => Set.new([@root_tems[:mapper]]),
            :reducer => Set.new([@root_tems[:reducer]]) }
  @free_tems = RBTree.new
  
  # TEM ordering: the mapper root is first, the reducer root is last, and the
  #               finalizer root is second
  @ordered_tems = (0...num_tems).to_a
  @ordered_tems -= @root_tems.values
  @ordered_tems = [@root_tems[:mapper]] + @ordered_tems
  unless @ordered_tems.include? @root_tems[:reducer]
    @ordered_tems += [@root_tems[:reducer]]
  end
  unless @ordered_tems.include? @root_tems[:finalizer]
    @ordered_tems = [@ordered_tems[0], @root_tems[:finalizer]] +
                     @ordered_tems[1..-1]
  end
  # Reverted index for the TEM ordering.
  @rindex_tems = Array.new(num_tems)
  @ordered_tems.each_with_index { |t, i| @rindex_tems[t] = i }
  
  @ordered_tems.each_with_index do |tem, i|
    @free_tems[[i, tem]] = true
    @without.each { |k, v| v[[i, tem]] = true unless tem == @root_tems[k] }
  end
  
  @unmapped_items = (0...num_items).to_a.reverse
  @reduce_queue = RBTree.new
  @last_output_id = 0
  @last_reduce_id = 2 * num_items - 2
  @done_reducing, @output_id = false, nil
end

Instance Attribute Details

#output_idObject (readonly)

The output ID of the Map-Reduce’s final result.



136
137
138
# File 'lib/tem_mr_search/map_reduce_planner.rb', line 136

def output_id
  @output_id
end

Instance Method Details

#action_done(action) ⇒ Object

Informs the planner that an action issued by next_actions! was completed.

Args:

action:: an action hash, as returned by next_actions!

The return value is not specified.



124
125
126
127
128
# File 'lib/tem_mr_search/map_reduce_planner.rb', line 124

def action_done(action)
  dispatch = { :migrate => :done_migrating, :map => :done_mapping, :reduce =>
               :done_reducing, :finalize => :done_finalizing } 
  self.send dispatch[action[:action]], action
end

#done?Boolean

True when the Map-Reduce computation is complete.

Returns:

  • (Boolean)


131
132
133
# File 'lib/tem_mr_search/map_reduce_planner.rb', line 131

def done?
  !@output_id.nil?
end

#next_actions!Object

Issues a set of actions that can be performed right now.

The method alters the planner’s state assuming the actions will be performed.

Returns an array of hashes, with one hash per action to be performed. The :action key specifies the type of action to be performed, and can be :migrate :map, :reduce, or +:finalize. All the actions have the :with key, which is the ID (0-based index) of the TEM that will be doing the action.

Migrate actions have the following keys:

:secpack:: the type of SECpack to be migrated ( +:mapper+ or +:reducer+ )
:with:: the ID of the TEM doing the migration  
:to:: the number of the TEM that the SECpack should be migrated to

Map actions have the following keys:

:item_id:: the ID of the item to be mapped (number in Table-Scan order)
:with:: the ID of the TEM doing the mapping
:output_id:: ID for the result of the map operation

Reduce actions have the following keys:

:output1_id, :output2_id:: the IDs of the partial outputs to be reduced
:with:: the ID of the TEM doing the reducing
:output_id:: the ID for the result of the reduce operation

The finalize action has the following keys:

:output_id:: the ID of the last partial output, which will be finalized
:with:: the ID of the TEM doing the finalization
:final_id:: the ID for the computation's final result


109
110
111
112
113
114
115
116
# File 'lib/tem_mr_search/map_reduce_planner.rb', line 109

def next_actions!
  actions = migrate_actions :mapper
  actions += migrate_actions :reducer
  actions += reduce_actions
  actions += map_actions
  actions += finalize_actions
  actions
end