Class: Pmux::TaskDispatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/pmux/task_dispatcher.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options, adapter, msession, gatherer = nil) ⇒ TaskDispatcher

Returns a new instance of TaskDispatcher.



8
9
10
11
12
13
14
15
16
17
18
19
20
# File 'lib/pmux/task_dispatcher.rb', line 8

def initialize options, adapter, msession, gatherer=nil
  @options = options
  @adapter = adapter
  @msession = msession
  @gatherer = gatherer || Gatherer.new(STDOUTWriter.new)
  @scheduler = TaskScheduler.new adapter

  @verbose = options[:verbose]
  @on_error = proc {|r|
    $stderr.write "%s: %s, %s\n" %
      [r['error'], r['error_message'], r['backtrace']]
  }
end

Instance Attribute Details

#adapterObject (readonly)

Returns the value of attribute adapter.



3
4
5
# File 'lib/pmux/task_dispatcher.rb', line 3

def adapter
  @adapter
end

#gathererObject (readonly)

Returns the value of attribute gatherer.



4
5
6
# File 'lib/pmux/task_dispatcher.rb', line 4

def gatherer
  @gatherer
end

#jlObject (readonly)

Returns the value of attribute jl.



6
7
8
# File 'lib/pmux/task_dispatcher.rb', line 6

def jl
  @jl
end

#mf_shuffleObject (readonly)

Returns the value of attribute mf_shuffle.



5
6
7
# File 'lib/pmux/task_dispatcher.rb', line 5

def mf_shuffle
  @mf_shuffle
end

#msessionObject (readonly)

Returns the value of attribute msession.



4
5
6
# File 'lib/pmux/task_dispatcher.rb', line 4

def msession
  @msession
end

#optionsObject (readonly)

Returns the value of attribute options.



3
4
5
# File 'lib/pmux/task_dispatcher.rb', line 3

def options
  @options
end

#schedulerObject (readonly)

Returns the value of attribute scheduler.



4
5
6
# File 'lib/pmux/task_dispatcher.rb', line 4

def scheduler
  @scheduler
end

Instance Method Details

#on_error(&block) ⇒ Object



103
104
105
# File 'lib/pmux/task_dispatcher.rb', line 103

def on_error &block
  @on_error = block
end

#receive_result(job, result) ⇒ Object



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/pmux/task_dispatcher.rb', line 107

def receive_result job, result
  task_id, node_addr, ifbase =
    result.values_at 'task_id', 'node_addr', 'ifbase'
  Log.info "receive result #{task_id} from #{node_addr}"
  puts "receive result #{task_id} from #{node_addr}" if @verbose
  if result['error']
    if @on_error
      @on_error.call result
      job.set_failed
    else
    end
  end

  if (ifbase = result['ifbase'])
    if job.num_r.zero?
      # no reducers; get the intermediate file from the mapper node
      ifpath = ifbase + '-0'
      local = "#{@options[:tmp_dir]}/w#{job.id}/#{File.basename(ifpath).sub /^[mr]/, 'w'}"
      if (body = result['result_body'])
        if body.size > 0
          open(local, 'w') {|f| f.write body}
          gatherer.writer.write local
        end
      else
        puts "gather #{node_addr}:#{ifpath} -> #{local}" if @verbose
        Log.info "gather #{node_addr}:#{ifpath} -> #{local}"
        gatherer.gather msession, node_addr, ifpath, local
      end
    else
      # send 'notify_reduce' message to the reducer node
      job.reducers.each_with_index {|reducer_addr, pindex|
        puts "send notify_reduce #{task_id} to #{reducer_addr}" if @verbose
        future = msession.call_async reducer_addr, :notify_reduce,
          :job_id=>job.id, :task_id=>task_id, :pindex=>pindex,
          :node_addr=>node_addr, :ifbase=>ifbase
        mf_shuffle.add future
      }
    end
  elsif (output_path = result['output_path'])
    # reduced result
    local = "#{@options[:tmp_dir]}/w#{job.id}/#{File.basename(output_path).sub /^[mr]/, 'w'}"
    gatherer.gather msession, node_addr, output_path, local
  end

  task = job.get_task_by_id task_id
  alloc_time = task[:alloc_time]
  allocated_at = alloc_time - job[:job_started_at]
  elapse = Time.now - alloc_time if alloc_time
  #task[:welapse] = result['welapse']
  jl.add task_id, :node_addr=>node_addr, :ifbase=>ifbase,
    :welapse=>result['welapse'], :elapse=>elapse,
    :allocated_at=>allocated_at

  # delete task
  scheduler.delete_task_from_job job, task, node_addr
  if (task_keys = result['task_keys'])
    for tid in task_keys.keys
      job.delete_task_by_id tid
    end
  end

  scheduler.process_queue

  perc = 100 * (job.tasks.size - job.taskhash.size) / job.tasks.size
  task_type = result['map'] ? 'map' : 'reduce'
  Log.info "done #{task_type} task #{result['task_id']} (#{perc}%)"
end

#run(job) ⇒ Object



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/pmux/task_dispatcher.rb', line 22

def run job
  if job.num_r.nonzero?
    @mf_shuffle = MR::MultiFuture.new
  end
  @jl = Joblogger.new options[:log_dir], job
  scheduler.push_job job
  scheduler.attach_flush_callback {|node_addr, task|
    t = task.dup
    [:node_addrs, :alloc_time].each {|k| t.delete k}
    Log.info "send task #{t[:task_id]} to #{node_addr}"
    printf "send task %s to %s\n", t[:task_id], node_addr if @verbose
    future = msession.call_async node_addr, :exec_streaming_task, t
    future.attach_callback {|f| receive_result job, f.get}
    # err callback ?
  }

  # init job
  wdir = "#{options[:tmp_dir]}/w#{job.id}"
  Dir.mkdir wdir
  jl.dump_header
  jl.sep
  Log.init "#{options[:log_dir] or wdir}/dispatcher.log"

  puts 'send "init_job"' if @verbose
  mf_init = msession.multicast_call_async :init_job, job.id
  mf_init.on_success {|future|
    addr = future.addr
    res = future.get
    job_dir = adapter[addr][:job_dir] = res['job_dir'] # remote job_dir
    adapter[addr][:num_workers] = res['num_cpu'] || 2
    printf "%s: remote job_dir = %s\n", addr, res['job_dir'] if @verbose
    # scp ship_files to remote job_dir
    if (ship_files = options[:ship_files])
      mf_scp = msession.scp_upload_files addr, ship_files, job_dir
      mf_scp.on_all {
        scheduler.shipped[addr] = true
        scheduler.process_queue
      }
      mf_scp.on_error {job.set_failed}
    else
      puts 'start scheduler' if @verbose
      scheduler.shipped[addr] = true
      scheduler.process_queue
    end
  }
  mf_init.on_error {job.set_failed}

  # wait for all map tasks to finish
  until job.completed?
    msession.loop.run_once
  end
  unless job.failed
    if job.num_r.zero?
      gatherer.join_all
    else
      mf_shuffle.join_all
      # reduce phase
      job.mk_reduce_tasks
      scheduler.inject_tasks job.tasks
      scheduler.process_queue
      # wait for all reduce tasks to finish
      until job.completed?
        msession.loop.run_once
      end
      gatherer.join_all
    end
  end

  Log.info "END"
  job[:job_finished_at] = Time.now
  jl.dump_footer
  jl.close

  mf_quit = msession.multicast_call_async :quit
  mf_quit.join_all

  cleaner = Cleaner.new "#{options[:tmp_dir]}/w*",
    "#{options[:log_dir]}/*.yml"
  cleaner.run
end