Module: ChronoForge::Executor

Includes:
Methods
Defined in:
lib/chrono_forge/executor.rb,
lib/chrono_forge/executor/context.rb,
lib/chrono_forge/executor/methods.rb,
lib/chrono_forge/executor/methods/wait.rb,
lib/chrono_forge/executor/lock_strategy.rb,
lib/chrono_forge/executor/retry_strategy.rb,
lib/chrono_forge/executor/execution_tracker.rb,
lib/chrono_forge/executor/methods/wait_until.rb,
lib/chrono_forge/executor/methods/continue_if.rb,
lib/chrono_forge/executor/methods/durably_repeat.rb,
lib/chrono_forge/executor/methods/durably_execute.rb,
lib/chrono_forge/executor/methods/workflow_states.rb

Defined Under Namespace

Modules: Methods Classes: ConcurrentExecutionError, Context, Error, ExecutionFailedError, ExecutionFlowControl, ExecutionTracker, HaltExecutionFlow, LockStrategy, LongRunningConcurrentExecutionError, NotExecutableError, RetryStrategy, WaitConditionNotMet, WorkflowNotRetryableError

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Methods::DurablyRepeat

#durably_repeat

Methods included from Methods::DurablyExecute

#durably_execute

Methods included from Methods::ContinueIf

#continue_if

Methods included from Methods::WaitUntil

#wait_until

Methods included from Methods::Wait

#wait

Class Method Details

.prepended(base) ⇒ Object

Add class methods



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/chrono_forge/executor.rb', line 18

def self.prepended(base)
  class << base
    # Enforce expected signature for perform_now with key as first arg and keywords after
    def perform_now(key, **kwargs)
      if !key.is_a?(String)
        raise ArgumentError, "Workflow key must be a string as the first argument"
      end
      super
    end

    # Enforce expected signature for perform_later with key as first arg and keywords after
    def perform_later(key, **kwargs)
      if !key.is_a?(String)
        raise ArgumentError, "Workflow key must be a string as the first argument"
      end
      super
    end

    # Add retry_now class method that calls perform_now with retry_workflow: true
    def retry_now(key, **kwargs)
      perform_now(key, retry_workflow: true, **kwargs)
    end

    # Add retry_later class method that calls perform_later with retry_workflow: true
    def retry_later(key, **kwargs)
      perform_later(key, retry_workflow: true, **kwargs)
    end
  end
end

Instance Method Details

#perform(key, attempt: 0, retry_workflow: false, options: {}, **kwargs) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/chrono_forge/executor.rb', line 48

def perform(key, attempt: 0, retry_workflow: false, options: {}, **kwargs)
  # Prevent excessive retries
  if attempt >= self.class::RetryStrategy.max_attempts
    Rails.logger.error { "ChronoForge:#{self.class} max attempts reached for job workflow(#{key})" }
    return
  end

  # Find or create workflow instance
  setup_workflow!(key, options, kwargs)

  # Handle retry parameter - unlock and continue execution
  retry_workflow! if retry_workflow

  # Track if we acquired the lock
  lock_acquired = false

  begin
    # Acquire lock with advanced concurrency protection
    @workflow = self.class::LockStrategy.acquire_lock(job_id, workflow, max_duration: max_duration)
    lock_acquired = true

    # Setup context
    setup_context!

    # Execute core job logic
    super(**workflow.kwargs.symbolize_keys)

    # Mark as complete
    complete_workflow!
  rescue ExecutionFailedError => e
    Rails.logger.error { "ChronoForge:#{self.class}(#{key}) step execution failed" }
    self.class::ExecutionTracker.track_error(workflow, e)
    workflow.stalled!
    nil
  rescue HaltExecutionFlow
    # Halt execution
    Rails.logger.debug { "ChronoForge:#{self.class}(#{key}) execution halted" }
    nil
  rescue ConcurrentExecutionError
    # Graceful handling of concurrent execution
    Rails.logger.warn { "ChronoForge:#{self.class}(#{key}) concurrent execution detected" }
    nil
  rescue NotExecutableError
    raise
  rescue => e
    Rails.logger.error { "ChronoForge:#{self.class}(#{key}) workflow execution failed" }
    error_log = self.class::ExecutionTracker.track_error(workflow, e)

    # Retry if applicable
    if should_retry?(e, attempt)
      self.class::RetryStrategy.schedule_retry(workflow, attempt: attempt)
    else
      fail_workflow! error_log
    end
  ensure
    if lock_acquired # Only release lock if we acquired it
      context.save!
      self.class::LockStrategy.release_lock(job_id, workflow)
    end
  end
end