Class: RubyReactor::Map::Collector
- Inherits:
-
Object
- Object
- RubyReactor::Map::Collector
- Extended by:
- Helpers
- Defined in:
- lib/ruby_reactor/map/collector.rb
Class Method Summary collapse
-
.perform(arguments) ⇒ Object
rubocop:disable Metrics/MethodLength.
Methods included from Helpers
apply_collect_block, build_element_inputs, load_parent_context_from_storage, resolve_reactor_class, resume_parent_execution
Class Method Details
.perform(arguments) ⇒ Object
rubocop:disable Metrics/MethodLength
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/ruby_reactor/map/collector.rb', line 9 def self.perform(arguments) arguments = arguments.transform_keys(&:to_sym) parent_context_id = arguments[:parent_context_id] map_id = arguments[:map_id] parent_reactor_class_name = arguments[:parent_reactor_class_name] step_name = arguments[:step_name] strict_ordering = arguments[:strict_ordering] storage = RubyReactor.configuration.storage_adapter # Retrieve parent context parent_context = load_parent_context_from_storage( parent_context_id, parent_reactor_class_name, storage ) # Retrieve results serialized_results = storage.retrieve_map_results(map_id, parent_reactor_class_name, strict_ordering: strict_ordering) results = serialized_results.map do |r| if r.is_a?(Hash) && r.key?("_error") RubyReactor::Failure(r["_error"]) else RubyReactor::Success(r) end end # Get step config to check for collect block and other options parent_class = Object.const_get(parent_reactor_class_name) step_config = parent_class.steps[step_name.to_sym] collect_block = step_config.arguments[:collect_block][:source].value # TODO: Check allow_partial_failure option final_result = if collect_block begin # Pass all results (Success and Failure) to collect block collected = collect_block.call(results) RubyReactor::Success(collected) rescue StandardError => e RubyReactor::Failure(e) end else # Default behavior: fail if any failure first_failure = results.find(&:failure?) first_failure || RubyReactor::Success(results.map(&:value)) end # Resume execution resume_parent_execution(parent_context, step_name, final_result, storage) end |