Class: MapReduce::Reducer

Inherits:
Object
  • Object
show all
Defined in:
lib/map_reduce/reducer.rb

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Reducer

Returns a new instance of Reducer.



3
4
5
6
7
# File 'lib/map_reduce/reducer.rb', line 3

def initialize(opts = {})
  @masters         = opts[:masters] || [::MapReduce::DEFAULT_SOCKET]
  @connection_type = opts[:type]    || :em
  @task            = opts[:task]
end

Instance Method Details

#em_reduce(all = nil, &blk) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/map_reduce/reducer.rb', line 29

def em_reduce(all = nil, &blk)
  all ||= sockets.dup
  sock = all.sample
  if sock
    sock.send_request(["reduce", @task]) do |message|
      key, *values = message
      if key.nil?
        all.delete sock
      else
        blk.call(key, values)
      end

      em_reduce(all, &blk)
    end
  else
    blk.call([nil])
  end
end

#reduce(&blk) ⇒ Object



9
10
11
12
13
14
15
# File 'lib/map_reduce/reducer.rb', line 9

def reduce(&blk)
  if @connection_type == :em
    em_reduce(&blk)
  else
    sync_reduce(&blk)
  end
end

#sync_reduce(&blk) ⇒ Object



17
18
19
20
21
22
23
24
25
26
27
# File 'lib/map_reduce/reducer.rb', line 17

def sync_reduce(&blk)
  all = sockets.dup
  while sock = all.sample
    key, *values = sock.send_request(["reduce", @task])
    if key.nil?
      all.delete sock
    else
      blk.call(key, values)
    end
  end
end