Class: MapReduce::Mapper
- Inherits:
-
Object
- Object
- MapReduce::Mapper
- Defined in:
- lib/map_reduce/mapper.rb
Instance Method Summary collapse
- #emit(key, value, &blk) ⇒ Object (also: #map)
-
#initialize(opts = {}) ⇒ Mapper
constructor
A new instance of Mapper.
- #wait_for_all(&blk) ⇒ Object
Constructor Details
#initialize(opts = {}) ⇒ Mapper
Returns a new instance of Mapper.
3 4 5 6 7 8 |
# File 'lib/map_reduce/mapper.rb', line 3 def initialize(opts = {}) @masters = opts[:masters] || [::MapReduce::DEFAULT_SOCKET] @connection_type = opts[:type] || :em @task_name = opts[:task] @disconnected = {} end |
Instance Method Details
#emit(key, value, &blk) ⇒ Object Also known as: map
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/map_reduce/mapper.rb', line 10 def emit(key, value, &blk) raise MapReduce::Exceptions::BlankKey, "Key can't be nil" if key.nil? sock = pick_master(key) sock.send_request(["map", key, value, @task_name]) do |res| if res @disconnected.delete(sock) if @disconnected[sock] if blk blk.call(res) else return res end else @disconnected[sock] = true emit(key, value, &blk) end end end |
#wait_for_all(&blk) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/map_reduce/mapper.rb', line 30 def wait_for_all(&blk) finished = Hash[socket.map{ |s| [s, false] }] sockets.each do |sock| sock.send_request(["map_finished", @task_name]) do || finished[sock] = [0] == "ok" if finished.all?{ |k,v| v } if block_given? blk.call else return end else after(1) do wait_for_all(&blk) end end end end end |