Class: Tem::Mr::Search::MapReduceExecutor

Inherits:
Object
  • Object
show all
Defined in:
lib/tem_mr_search/map_reduce_executor.rb

Overview

Coordination code (executor) for performing a Map-Reduce computation.

The executor distributes the Map-Reduce computation across multiple TEMs. The strategy used to allocate tasks to TEMs is expressed by a MapReducePlanner class, and the executor instantiates that class. The executor is responsible for coordinating between the TEMs and the planner.

Instance Method Summary collapse

Constructor Details

#initialize(job, db, tems, root_tems, planner_class = nil) ⇒ MapReduceExecutor

Creates an executor for a Map-Reduce job.

Arguments:

job:: the Map-Reduce job (see Tem::Mr::Search::MapReduceJob)
db:: the database to run Map-Reduce over
tems:: sessions to the available TEMs
root_tems:: the indexes of the TEMs that have the initial SECpacks bound
            to them (hash with the keys +:mapper+, +:reducer+ and
            +:finalizer+)
planner_class:: (optional) replacement for the default planner strategy


30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/tem_mr_search/map_reduce_executor.rb', line 30

def initialize(job, db, tems, root_tems, planner_class = nil)
  planner_class ||= MapReducePlanner
  
  @db = db  # Writable only in main thread.
  @tems = tems  # Writable only in main thread.
  
  # Protected by @lock during collect_tem_ids, read-only during execute.
  @tem_certs = Array.new @tems.length

  # Writable only in main thread.
  @planner = planner_class.new job, db.length, tems.length, root_tems
  
  # Protected by @lock
  @tem_parts = { :mapper => { root_tems[:mapper] => job.mapper },
                 :reducer => {root_tems[:reducer] => job.reducer },
                 :finalizer => { root_tems[:finalizer] => job.finalizer } }
  # Protected by @lock
  @outputs = {}
  
  # Protected by @lock
  @timings = { :tems => Array.new(@tems.length, 0.0),
               :tasks => { :map => 0.0, :reduce => 0.0, :finalize => 0.0,
                           :migrate => 0.0, :tem_ids => 0.0 } }
  
  # Thread-safe.
  @thread_queues = tems.map { |tem| Queue.new }
  @main_queue = Queue.new
  @lock = Mutex.new
end

Instance Method Details

#collect_tem_idsObject

Collects identification information from all the TEMs.



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/tem_mr_search/map_reduce_executor.rb', line 87

def collect_tem_ids
  threads = (0...@tems.length).map do |tem_index|
    Thread.new(tem_index, @tems[tem_index]) do |index, tem|
      t0 = Time.now
      ecert = tem.endorsement_cert
      time_delta = Time.now - t0
      @lock.synchronize do
        @tem_certs[index] = ecert
        @timings[:tasks][:tem_ids] += time_delta
        @timings[:tems][index] += time_delta
      end
    end
  end
  threads.each { |thread| thread.join }
end

#executeObject

Executes the job.

Returns a hash with the following keys:

:result:: the job's result
:timings:: timing statistics on the job's execution


65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/tem_mr_search/map_reduce_executor.rb', line 65

def execute
  t0 = Time.now
  collect_tem_ids
  
  # Spawn TEM threads.
  @tems.each_index { |i| Thread.new(i) { |i| executor_thread i } }
  
  until @planner.done?
    actions = @planner.next_actions!
    @lock.synchronize do
      actions.each { |action| @thread_queues[action[:with]] << action }
    end
    
    action = @main_queue.pop
    @planner.action_done action
  end
  @timings[:total] = Time.now - t0
  
  return { :result => @outputs[@planner.output_id], :timings => @timings }
end