Module: Bramble::Map
- Extended by:
- Keys
- Defined in:
- lib/bramble/map.rb
Class Method Summary collapse
-
.perform(handle, implementation, values) ⇒ Object
For each of ‘values`, queue up a job to call the map function.
-
.perform_map(handle, implementation, value) ⇒ Object
Call ‘.map` on `value`, storing the result for `.reduce` and handling any error.
Methods included from Keys
data_key, finished_at_key, job_id_key, keys_key, map_finished_count_key, map_total_count_key, namespace, reduce_finished_count_key, reduce_total_count_key, result_key, status_key
Class Method Details
.perform(handle, implementation, values) ⇒ Object
For each of ‘values`, queue up a job to call the map function
8 9 10 11 12 13 14 15 |
# File 'lib/bramble/map.rb', line 8 def perform(handle, implementation, values) Bramble::State.running?(handle) do storage.set(map_total_count_key(handle), values.length) values.each do |value| Bramble::MapJob.perform_later(handle, implementation.name, Bramble::Serialize.dump(value)) end end end |
.perform_map(handle, implementation, value) ⇒ Object
Call ‘.map` on `value`, storing the result for `.reduce` and handling any error.
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/bramble/map.rb', line 18 def perform_map(handle, implementation, value) Bramble::State.running?(handle) do impl_keys_key = keys_key(handle) Bramble::ErrorHandling.rescuing(implementation) do # Execute the provided map function implementation.map(value) do |map_key, map_val| Bramble::State.running?(handle) do raw_key = Bramble::Serialize.dump(map_key) raw_value = Bramble::Serialize.dump(map_val) # Push the result to be reduced storage.map_keys_push(impl_keys_key, raw_key) storage.map_result_push(data_key(handle, raw_key), raw_value) end end end # Mark this item as mapped (even if there was an error) Bramble::State.running?(handle) do finished = storage.increment(map_finished_count_key(handle)) total = storage.get(map_total_count_key(handle)).to_i if finished == total Bramble::Reduce.perform(handle, implementation) end end end end |