Class: MapReduce::Reducer
- Inherits:
-
Object
- Object
- MapReduce::Reducer
- Defined in:
- lib/map_reduce/reducer.rb
Constant Summary collapse
- TIMEOUT =
0.1
Instance Method Summary collapse
- #em_reduce(all = nil, &blk) ⇒ Object
-
#initialize(opts = {}) ⇒ Reducer
constructor
A new instance of Reducer.
- #reduce(&blk) ⇒ Object
- #sync_reduce(&blk) ⇒ Object
Constructor Details
#initialize(opts = {}) ⇒ Reducer
Returns a new instance of Reducer.
5 6 7 8 9 |
# File 'lib/map_reduce/reducer.rb', line 5 def initialize(opts = {}) @masters = opts[:masters] || [::MapReduce::DEFAULT_SOCKET] @connection_type = opts[:type] || :em @task = opts[:task] end |
Instance Method Details
#em_reduce(all = nil, &blk) ⇒ Object
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/map_reduce/reducer.rb', line 31 def em_reduce(all = nil, &blk) all ||= sockets.dup sock = all.sample if sock sock.send_request(["reduce", @task]) do || key, *values = if key.nil? all.delete sock else blk.call(key, values) end em_reduce(all, &blk) end else blk.call([nil]) end end |
#reduce(&blk) ⇒ Object
11 12 13 14 15 16 17 |
# File 'lib/map_reduce/reducer.rb', line 11 def reduce(&blk) if @connection_type == :em em_reduce(&blk) else sync_reduce(&blk) end end |
#sync_reduce(&blk) ⇒ Object
19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/map_reduce/reducer.rb', line 19 def sync_reduce(&blk) all = sockets.dup while sock = all.sample key, *values = sock.send_request(["reduce", @task]) if key.nil? all.delete sock else blk.call(key, values) end end end |