Class: RubyReactor::Map::Collector

Inherits:
Object
  • Object
show all
Extended by:
Helpers
Defined in:
lib/ruby_reactor/map/collector.rb

Class Method Summary collapse

Methods included from Helpers

apply_collect_block, build_element_inputs, load_parent_context_from_storage, resolve_reactor_class, resume_parent_execution

Class Method Details

.perform(arguments) ⇒ Object

rubocop:disable Metrics/MethodLength



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/ruby_reactor/map/collector.rb', line 9

def self.perform(arguments)
  arguments = arguments.transform_keys(&:to_sym)
  parent_context_id = arguments[:parent_context_id]
  map_id = arguments[:map_id]
  parent_reactor_class_name = arguments[:parent_reactor_class_name]
  step_name = arguments[:step_name]
  strict_ordering = arguments[:strict_ordering]

  storage = RubyReactor.configuration.storage_adapter

  # Retrieve parent context
  parent_context = load_parent_context_from_storage(
    parent_context_id,
    parent_reactor_class_name,
    storage
  )

  # Retrieve results
  serialized_results = storage.retrieve_map_results(map_id, parent_reactor_class_name,
                                                    strict_ordering: strict_ordering)

  results = serialized_results.map do |r|
    if r.is_a?(Hash) && r.key?("_error")
      RubyReactor::Failure(r["_error"])
    else
      RubyReactor::Success(r)
    end
  end

  # Get step config to check for collect block and other options
  parent_class = Object.const_get(parent_reactor_class_name)
  step_config = parent_class.steps[step_name.to_sym]

  collect_block = step_config.arguments[:collect_block][:source].value
  # TODO: Check allow_partial_failure option

  final_result = if collect_block
                   begin
                     # Pass all results (Success and Failure) to collect block
                     collected = collect_block.call(results)
                     RubyReactor::Success(collected)
                   rescue StandardError => e
                     RubyReactor::Failure(e)
                   end
                 else
                   # Default behavior: fail if any failure
                   first_failure = results.find(&:failure?)
                   first_failure || RubyReactor::Success(results.map(&:value))
                 end

  # Resume execution
  resume_parent_execution(parent_context, step_name, final_result, storage)
end