Module: ChronoForge::Executor::Methods::DurablyRepeat

Included in:
ChronoForge::Executor::Methods
Defined in:
lib/chrono_forge/executor/methods/durably_repeat.rb

Instance Method Summary collapse

Instance Method Details

#durably_repeat(method, every:, till:, start_at: nil, max_attempts: 3, timeout: 1.hour, on_error: :continue, name: nil) ⇒ nil

Schedules a method to be called repeatedly at specified intervals until a condition is met.

This method provides durable, idempotent periodic task execution with automatic catch-up for missed executions using timeout-based fast-forwarding. Each repetition gets its own execution log, ensuring proper tracking and retry behavior.

Behavior

Method Parameters

Your periodic method can optionally receive the scheduled execution time:

  • Method with no parameters: ‘def my_task; end` - called as `my_task()`

  • Method with parameter: ‘def my_task(next_execution_at); end` - called as `my_task(scheduled_time)`

This allows methods to:

  • Log lateness/timing information

  • Perform time-based calculations

  • Include scheduled time in notifications

  • Generate reports for specific time periods

Idempotency

Each execution gets a unique step name based on the scheduled execution time, ensuring that workflow replays don’t create duplicate tasks.

Catch-up Mechanism

If a workflow is paused and resumes later, the timeout parameter handles catch-up:

  • Executions older than ‘timeout` are automatically skipped

  • The periodic schedule integrity is maintained

  • Eventually reaches current/future execution times

Error Handling

  • Individual execution failures are retried up to ‘max_attempts` with exponential backoff

  • After max attempts, behavior depends on ‘on_error` parameter:

    • ‘:continue`: Failed execution is logged, next execution is scheduled

    • ‘:fail_workflow`: ExecutionFailedError is raised, failing the entire workflow

  • Timeouts are not considered errors and always continue to the next execution

Execution Logs

Creates two types of execution logs:

  • Coordination log: ‘durably_repeat$#name` - tracks overall periodic task state

  • Repetition logs: ‘durably_repeat$#name$#timestamp` - tracks individual executions

Examples:

Basic usage

durably_repeat :send_reminder_email, every: 3.days, till: :user_onboarded?

Method with scheduled execution time parameter

def send_reminder_email(next_execution_at)
  # Can access the scheduled execution time
  lateness = Time.current - next_execution_at
  Rails.logger.info "Email scheduled for #{next_execution_at}, running #{lateness.to_i}s late"
  UserMailer.reminder_email(user_id, scheduled_for: next_execution_at).deliver_now
end

durably_repeat :send_reminder_email, every: 3.days, till: :user_onboarded?

Resilient background task (default)

durably_repeat :cleanup_temp_files,
  every: 1.day,
  till: :cleanup_complete?,
  on_error: :continue

Critical task that should fail workflow on error

durably_repeat :process_payments,
  every: 1.hour,
  till: :all_payments_processed?,
  on_error: :fail_workflow

Advanced usage with all options

def generate_daily_report(scheduled_time)
  report_date = scheduled_time.to_date
  DailyReportService.new(date: report_date).generate
end

durably_repeat :generate_daily_report,
  every: 1.day,
  till: :reports_complete?,
  start_at: Date.tomorrow.beginning_of_day,
  max_attempts: 5,
  timeout: 2.hours,
  on_error: :fail_workflow,
  name: "daily_reports"

Parameters:

  • method (Symbol)

    The name of the instance method to execute repeatedly. The method can optionally accept the scheduled execution time as its first argument.

  • every (ActiveSupport::Duration)

    The interval between executions (e.g., 3.days, 1.hour)

  • till (Symbol, Proc)

    The condition to check for stopping repetition. Should return true when repetition should stop. Can be a symbol for instance methods or a callable.

  • start_at (Time, nil) (defaults to: nil)

    When to start the periodic task. Defaults to coordination_log.created_at + every

  • max_attempts (Integer) (defaults to: 3)

    Maximum retry attempts per individual execution (default: 3)

  • timeout (ActiveSupport::Duration) (defaults to: 1.hour)

    How long after scheduled time an execution is considered stale and skipped (default: 1.hour). This enables catch-up behavior.

  • on_error (Symbol) (defaults to: :continue)

    How to handle repetition failures after max_attempts. Options:

    • :continue (default): Log failure and continue with next scheduled execution

    • :fail_workflow: Raise ExecutionFailedError to fail the entire workflow

  • name (String, nil) (defaults to: nil)

    Custom name for the periodic task. Defaults to method name. Used to create unique step names for execution logs.

Returns:

  • (nil)


103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/chrono_forge/executor/methods/durably_repeat.rb', line 103

def durably_repeat(method, every:, till:, start_at: nil, max_attempts: 3, timeout: 1.hour, on_error: :continue, name: nil)
  step_name = "durably_repeat$#{name || method}"

  # Get or create the main coordination log for this periodic task
  coordination_log = ExecutionLog.create_or_find_by!(
    workflow: @workflow,
    step_name: step_name
  ) do |log|
    log.started_at = Time.current
    log. = {last_execution_at: nil}
  end

  # Return if already completed
  return if coordination_log.completed?

  # Update coordination log attempt tracking
  coordination_log.update!(
    attempts: coordination_log.attempts + 1,
    last_executed_at: Time.current
  )

  # Check if we should stop repeating
  condition_met = if till.is_a?(Symbol)
    send(till)
  else
    till.call(context)
  end
  if condition_met
    coordination_log.update!(
      state: :completed,
      completed_at: Time.current
    )
    return
  end

  # Calculate next execution time
   = coordination_log.
  last_execution_at = ["last_execution_at"] ? Time.parse(["last_execution_at"]) : nil

  next_execution_at = if last_execution_at
    last_execution_at + every
  elsif start_at
    start_at
  else
    coordination_log.created_at + every
  end

  execute_or_schedule_repetition(method, coordination_log, next_execution_at, every, max_attempts, timeout, on_error)
  nil
end