Class: MapReduce::Reducer

Inherits:
Object
  • Object
show all
Defined in:
lib/map_reduce/reducer.rb

Constant Summary collapse

TIMEOUT =
0.1

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Reducer

Returns a new instance of Reducer.



5
6
7
8
9
# File 'lib/map_reduce/reducer.rb', line 5

def initialize(opts = {})
  @masters         = opts[:masters] || [::MapReduce::DEFAULT_SOCKET]
  @connection_type = opts[:type]    || :em
  @task            = opts[:task]
end

Instance Method Details

#em_reduce(all = nil, &blk) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/map_reduce/reducer.rb', line 31

def em_reduce(all = nil, &blk)
  all ||= sockets.dup
  sock = all.sample
  if sock
    sock.send_request(["reduce", @task]) do |message|
      key, *values = message
      if key.nil?
        all.delete sock
      else
        blk.call(key, values)
      end

      em_reduce(all, &blk)
    end
  else
    blk.call([nil])
  end
end

#reduce(&blk) ⇒ Object



11
12
13
14
15
16
17
# File 'lib/map_reduce/reducer.rb', line 11

def reduce(&blk)
  if @connection_type == :em
    em_reduce(&blk)
  else
    sync_reduce(&blk)
  end
end

#sync_reduce(&blk) ⇒ Object



19
20
21
22
23
24
25
26
27
28
29
# File 'lib/map_reduce/reducer.rb', line 19

def sync_reduce(&blk)
  all = sockets.dup
  while sock = all.sample
    key, *values = sock.send_request(["reduce", @task])
    if key.nil?
      all.delete sock
    else
      blk.call(key, values)
    end
  end
end