Class: Tasker::Orchestration::TaskReenqueuer

Inherits:
Object
  • Object
show all
Includes:
Concerns::EventPublisher, Concerns::IdempotentStateTransitions
Defined in:
lib/tasker/orchestration/task_reenqueuer.rb

Overview

TaskReenqueuer handles the mechanics of re-enqueueing tasks for continued processing

This class provides implementation for task re-enqueueing logic while firing lifecycle events for observability. Separates the decision to re-enqueue from the mechanics of how re-enqueueing works.

Instance Method Summary collapse

Methods included from Concerns::EventPublisher

#infer_step_event_type_from_state, #publish_custom_event, #publish_no_viable_steps, #publish_step_backoff, #publish_step_before_handle, #publish_step_cancelled, #publish_step_completed, #publish_step_event_for_context, #publish_step_failed, #publish_step_retry_requested, #publish_step_started, #publish_steps_execution_completed, #publish_steps_execution_started, #publish_task_completed, #publish_task_enqueue, #publish_task_failed, #publish_task_finalization_completed, #publish_task_finalization_started, #publish_task_pending_transition, #publish_task_reenqueue_delayed, #publish_task_reenqueue_failed, #publish_task_reenqueue_requested, #publish_task_reenqueue_started, #publish_task_retry_requested, #publish_task_started, #publish_viable_steps_discovered, #publish_workflow_state_unclear, #publish_workflow_step_completed, #publish_workflow_task_started

Methods included from Concerns::IdempotentStateTransitions

#conditional_transition_to, #in_any_state?, #safe_current_state, #safe_transition_to

Instance Method Details

#reenqueue_task(task, reason: Constants::TaskFinalization::ReenqueueReasons::PENDING_STEPS_REMAINING) ⇒ Boolean

Re-enqueue a task for continued processing

Parameters:

  • task (Tasker::Task)

    The task to re-enqueue

  • reason (String) (defaults to: Constants::TaskFinalization::ReenqueueReasons::PENDING_STEPS_REMAINING)

    The reason for re-enqueueing (for observability)

Returns:

  • (Boolean)

    True if re-enqueueing was successful



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/tasker/orchestration/task_reenqueuer.rb', line 22

def reenqueue_task(task, reason: Constants::TaskFinalization::ReenqueueReasons::PENDING_STEPS_REMAINING)
  # Fire re-enqueue started event
  publish_task_reenqueue_started(task, reason: reason)

  # Transition task back to pending state for clarity
  if safe_transition_to(task, Tasker::Constants::TaskStatuses::PENDING)
    Rails.logger.debug { "TaskReenqueuer: Task #{task.task_id} transitioned back to pending" }
  end

  # Enqueue the task for processing
  Tasker::TaskRunnerJob.perform_later(task.task_id)

  # Fire re-enqueue completed event
  publish_task_reenqueue_requested(task, reason: reason)

  Rails.logger.debug { "TaskReenqueuer: Task #{task.task_id} re-enqueued due to #{reason}" }
  true
rescue StandardError => e
  # Fire re-enqueue failed event
  publish_task_reenqueue_failed(task, reason: reason, error: e.message)

  Rails.logger.error("TaskReenqueuer: Failed to re-enqueue task #{task.task_id}: #{e.message}")
  false
end

#reenqueue_task_delayed(task, delay_seconds:, reason: Constants::TaskFinalization::ReenqueueReasons::RETRY_BACKOFF) ⇒ Boolean

Schedule a delayed re-enqueue (for retry scenarios)

Parameters:

  • task (Tasker::Task)

    The task to re-enqueue

  • delay_seconds (Integer)

    Number of seconds to delay

  • reason (String) (defaults to: Constants::TaskFinalization::ReenqueueReasons::RETRY_BACKOFF)

    The reason for delayed re-enqueueing

Returns:

  • (Boolean)

    True if scheduling was successful



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/tasker/orchestration/task_reenqueuer.rb', line 53

def reenqueue_task_delayed(task, delay_seconds:,
                           reason: Constants::TaskFinalization::ReenqueueReasons::RETRY_BACKOFF)
  # Fire delayed re-enqueue started event
  publish_task_reenqueue_delayed(task, delay_seconds: delay_seconds, reason: reason)

  # Schedule the delayed job
  Tasker::TaskRunnerJob.set(wait: delay_seconds.seconds).perform_later(task.task_id)

  Rails.logger.debug do
    "TaskReenqueuer: Task #{task.task_id} scheduled for re-enqueue in #{delay_seconds} seconds"
  end
  true
rescue StandardError => e
  Rails.logger.error("TaskReenqueuer: Failed to schedule delayed re-enqueue for task #{task.task_id}: #{e.message}")
  false
end