Module: Bramble::Map

Extended by:
Keys
Defined in:
lib/bramble/map.rb

Class Method Summary collapse

Methods included from Keys

data_key, finished_at_key, job_id_key, keys_key, map_finished_count_key, map_total_count_key, namespace, reduce_finished_count_key, reduce_total_count_key, result_key, status_key

Class Method Details

.perform(handle, implementation, values) ⇒ Object

For each of ‘values`, queue up a job to call the map function



8
9
10
11
12
13
14
15
# File 'lib/bramble/map.rb', line 8

def perform(handle, implementation, values)
  Bramble::State.running?(handle) do
    storage.set(map_total_count_key(handle), values.length)
    values.each do |value|
      Bramble::MapJob.perform_later(handle, implementation.name, Bramble::Serialize.dump(value))
    end
  end
end

.perform_map(handle, implementation, value) ⇒ Object

Call ‘.map` on `value`, storing the result for `.reduce` and handling any error.



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/bramble/map.rb', line 18

def perform_map(handle, implementation, value)
  Bramble::State.running?(handle) do
    impl_keys_key = keys_key(handle)

    Bramble::ErrorHandling.rescuing(implementation) do
      # Execute the provided map function
      implementation.map(value) do |map_key, map_val|
        Bramble::State.running?(handle) do
          raw_key = Bramble::Serialize.dump(map_key)
          raw_value = Bramble::Serialize.dump(map_val)
          # Push the result to be reduced
          storage.map_keys_push(impl_keys_key, raw_key)
          storage.map_result_push(data_key(handle, raw_key), raw_value)
        end
      end
    end

    # Mark this item as mapped (even if there was an error)
    Bramble::State.running?(handle) do
      finished = storage.increment(map_finished_count_key(handle))
      total = storage.get(map_total_count_key(handle)).to_i
      if finished == total
        Bramble::Reduce.perform(handle, implementation)
      end
    end
  end
end