2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
# File 'lib/skynet/skynet_ruby_extensions.rb', line 2
def mapreduce(klass=nil,options={},&block)
data = []
if self.is_a?(Hash)
self.each {|k,v| data << {k => v}}
else
data = self
end
jobopts = {
:mappers => 20000,
:map_data => data,
:name => "#{klass} Enumerable MASTER",
:map_name => "#{klass} Enumerable MAP",
:reduce_name => "#{klass} Enumerable REDUCE",
:map_timeout => 3600,
:reduce_timeout => 3600,
:master_timeout => 3600,
:master_result_timeout => 3600
}
jobopts[:map_reduce_class] = klass.to_s if klass
options.each { |k,v| jobopts[k] = v }
if block_given?
jobopts[:map] = block
end
if block_given? or not jobopts[:async]
job = Skynet::Job.new(jobopts.merge(:local_master => true))
else
job = Skynet::AsyncJob.new(jobopts)
end
job.run
end
|