Class: RubyReactor::Map::ElementExecutor

Inherits:
Object
  • Object
show all
Extended by:
Helpers
Defined in:
lib/ruby_reactor/map/element_executor.rb

Class Method Summary collapse

Methods included from Helpers

apply_collect_block, build_element_inputs, load_parent_context_from_storage, resolve_reactor_class, resume_parent_execution

Class Method Details

.perform(arguments) ⇒ Object

rubocop:disable Metrics/MethodLength



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/ruby_reactor/map/element_executor.rb', line 9

def self.perform(arguments)
  arguments = arguments.transform_keys(&:to_sym)
  map_id = arguments[:map_id]
  _element_id = arguments[:element_id]
  index = arguments[:index]
  serialized_inputs = arguments[:serialized_inputs]
  reactor_class_info = arguments[:reactor_class_info]
  strict_ordering = arguments[:strict_ordering]
  parent_context_id = arguments[:parent_context_id]
  parent_reactor_class_name = arguments[:parent_reactor_class_name]
  step_name = arguments[:step_name]
  batch_size = arguments[:batch_size]
  # rubocop:enable Metrics/MethodLength
  serialized_context = arguments[:serialized_context]

  if serialized_context
    context = ContextSerializer.deserialize(serialized_context)
    context. = arguments
    reactor_class = context.reactor_class
  else
    # Deserialize inputs
    inputs = ContextSerializer.deserialize_value(serialized_inputs)

    # Resolve reactor class
    reactor_class = resolve_reactor_class(reactor_class_info)

    # Create context
    context = Context.new(inputs, reactor_class)
    context. = arguments
  end
  storage = RubyReactor.configuration.storage_adapter

  # Execute
  executor = Executor.new(reactor_class, {}, context)

  if serialized_context
    executor.resume_execution
  else
    executor.execute
  end

  result = executor.result

  if result.is_a?(RetryQueuedResult)
    queue_next_batch(arguments) if batch_size
    return
  end

  # Store result

  # Store result

  if result.success?
    storage.store_map_result(map_id, index, result.value, parent_reactor_class_name,
                             strict_ordering: strict_ordering)
  else
    # Store error
    storage.store_map_result(map_id, index, { _error: result.error }, parent_reactor_class_name,
                             strict_ordering: strict_ordering)
  end

  # Decrement counter
  new_count = storage.decrement_map_counter(map_id, parent_reactor_class_name)

  queue_next_batch(arguments) if batch_size

  return unless new_count.zero?

  # Trigger collection
  RubyReactor.configuration.async_router.perform_map_collection_async(
    parent_context_id: parent_context_id,
    map_id: map_id,
    parent_reactor_class_name: parent_reactor_class_name,
    step_name: step_name,
    strict_ordering: strict_ordering,
    timeout: 3600
  )
end