Class: RubyReactor::Map::ElementExecutor
- Inherits:
-
Object
- Object
- RubyReactor::Map::ElementExecutor
- Extended by:
- Helpers
- Defined in:
- lib/ruby_reactor/map/element_executor.rb
Class Method Summary collapse
-
.perform(arguments) ⇒ Object
rubocop:disable Metrics/MethodLength.
Methods included from Helpers
apply_collect_block, build_element_inputs, load_parent_context_from_storage, resolve_reactor_class, resume_parent_execution
Class Method Details
.perform(arguments) ⇒ Object
rubocop:disable Metrics/MethodLength
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/ruby_reactor/map/element_executor.rb', line 9 def self.perform(arguments) arguments = arguments.transform_keys(&:to_sym) map_id = arguments[:map_id] _element_id = arguments[:element_id] index = arguments[:index] serialized_inputs = arguments[:serialized_inputs] reactor_class_info = arguments[:reactor_class_info] strict_ordering = arguments[:strict_ordering] parent_context_id = arguments[:parent_context_id] parent_reactor_class_name = arguments[:parent_reactor_class_name] step_name = arguments[:step_name] batch_size = arguments[:batch_size] # rubocop:enable Metrics/MethodLength serialized_context = arguments[:serialized_context] if serialized_context context = ContextSerializer.deserialize(serialized_context) context. = arguments reactor_class = context.reactor_class else # Deserialize inputs inputs = ContextSerializer.deserialize_value(serialized_inputs) # Resolve reactor class reactor_class = resolve_reactor_class(reactor_class_info) # Create context context = Context.new(inputs, reactor_class) context. = arguments end storage = RubyReactor.configuration.storage_adapter # Execute executor = Executor.new(reactor_class, {}, context) if serialized_context executor.resume_execution else executor.execute end result = executor.result if result.is_a?(RetryQueuedResult) queue_next_batch(arguments) if batch_size return end # Store result # Store result if result.success? storage.store_map_result(map_id, index, result.value, parent_reactor_class_name, strict_ordering: strict_ordering) else # Store error storage.store_map_result(map_id, index, { _error: result.error }, parent_reactor_class_name, strict_ordering: strict_ordering) end # Decrement counter new_count = storage.decrement_map_counter(map_id, parent_reactor_class_name) queue_next_batch(arguments) if batch_size return unless new_count.zero? # Trigger collection RubyReactor.configuration.async_router.perform_map_collection_async( parent_context_id: parent_context_id, map_id: map_id, parent_reactor_class_name: parent_reactor_class_name, step_name: step_name, strict_ordering: strict_ordering, timeout: 3600 ) end |