Class: Tasker::StepHandler::Base

Inherits:
Object
  • Object
show all
Includes:
Concerns::EventPublisher, Concerns::StructuredLogging, AutomaticEventPublishing
Defined in:
lib/tasker/step_handler/base.rb

Overview

Base class for all step handlers that defines the common interface and provides lifecycle event handling

⚠️ IMPORTANT DEVELOPER GUIDANCE:

  • NEVER override the handle() method - it's framework-only code
  • ALWAYS implement the process() method - that's your extension point
  • The handle() method automatically publishes lifecycle events and calls your process() method

📊 STRUCTURED LOGGING AVAILABLE: All step handlers automatically include structured logging capabilities:

  • Use log_structured() to emit structured logs with correlation IDs
  • Correlation ID is automatically extracted from the task for traceability
  • Example: log_structured(level: :info, message: "Processing order", order_id: order.id)

Direct Known Subclasses

Api

Constant Summary

Constants included from Concerns::StructuredLogging

Concerns::StructuredLogging::CORRELATION_ID_KEY

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Concerns::StructuredLogging

#correlation_id, #correlation_id=, #log_exception, #log_orchestration_event, #log_performance_event, #log_step_event, #log_structured, #log_task_event, #with_correlation_id

Methods included from Concerns::EventPublisher

#infer_step_event_type_from_state, #publish_custom_event, #publish_no_viable_steps, #publish_step_backoff, #publish_step_before_handle, #publish_step_cancelled, #publish_step_completed, #publish_step_event_for_context, #publish_step_failed, #publish_step_retry_requested, #publish_step_started, #publish_steps_execution_completed, #publish_steps_execution_started, #publish_task_completed, #publish_task_enqueue, #publish_task_failed, #publish_task_finalization_completed, #publish_task_finalization_started, #publish_task_pending_transition, #publish_task_reenqueue_delayed, #publish_task_reenqueue_failed, #publish_task_reenqueue_requested, #publish_task_reenqueue_started, #publish_task_retry_requested, #publish_task_started, #publish_viable_steps_discovered, #publish_workflow_state_unclear, #publish_workflow_step_completed, #publish_workflow_task_started

Constructor Details

#initialize(config: nil) ⇒ Base

Creates a new step handler instance

Parameters:

  • config (Object, nil) (defaults to: nil)

    Optional configuration for the handler



77
78
79
# File 'lib/tasker/step_handler/base.rb', line 77

def initialize(config: nil)
  @config = config
end

Instance Attribute Details

#configObject (readonly, protected)

Access to configuration passed during initialization



203
204
205
# File 'lib/tasker/step_handler/base.rb', line 203

def config
  @config
end

Class Method Details

.custom_event_configurationArray<Hash>

Class method that step handlers can override to declare custom events

✅ OVERRIDE THIS METHOD: To declare custom events your step handler publishes

Return an array of hashes where each hash defines a custom event:

  • name: The event name (without namespace)
  • description: Human-readable description of when this event is published

Example: def self.custom_event_configuration [ { name: 'payment.processed', description: 'Published when payment processing completes successfully' }, { name: 'payment.risk_flagged', description: 'Published when payment is flagged for manual review' } ] end

Returns:

  • (Array<Hash>)

    Array of custom event definitions



196
197
198
# File 'lib/tasker/step_handler/base.rb', line 196

def self.custom_event_configuration
  []
end

Instance Method Details

#handle(task, sequence, step) ⇒ Object

Framework method that coordinates step execution with automatic event publishing

⚠️ NEVER OVERRIDE THIS METHOD IN SUBCLASSES This method is framework-only code that:

  1. Is automatically wrapped with event publishing via AutomaticEventPublishing
  2. Calls the developer-implemented process() method
  3. Handles framework-level concerns like logging and error propagation

Parameters:

Returns:

  • (Object)

    The result of processing the step



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/tasker/step_handler/base.rb', line 93

def handle(task, sequence, step)
  log_structured(
    :debug,
    'Starting step execution',
    step_id: step.workflow_step_id,
    step_name: step.name,
    correlation_id: task.task_id
  )

  # Store initial results state to detect if developer set them manually
  initial_results = step.results

  # Call the developer-implemented process method
  process_output = process(task, sequence, step)

  # Process results using overridable method, respecting developer customization
  process_results(step, process_output, initial_results)

  log_structured(
    :debug,
    'Completed step execution',
    step_id: step.workflow_step_id,
    step_name: step.name,
    correlation_id: task.task_id,
    has_results: step.results.present?
  )

  process_output
end

#process(task, sequence, step) ⇒ Object

Developer extension point - implement your business logic here

✅ ALWAYS IMPLEMENT THIS METHOD IN SUBCLASSES This is where you put your step's business logic. The framework will:

  • Automatically publish step_started before calling this method
  • Automatically publish step_completed after this method succeeds
  • Automatically publish step_failed if this method raises an exception

Return your results from this method - they will be stored in step.results automatically via process_results(). You can override process_results() to customize how the return value gets stored.

Parameters:

Returns:

  • (Object)

    The results of processing - will be stored in step.results

Raises:

  • (NotImplementedError)

    If not implemented by a subclass



140
141
142
143
# File 'lib/tasker/step_handler/base.rb', line 140

def process(task, sequence, step)
  raise NotImplementedError,
        'Subclasses must implement the process method. This is your extension point for business logic.'
end

#process_results(step, process_output, initial_results) ⇒ void

This method returns an undefined value.

Process the output from process() method and store in step.results

✅ OVERRIDE THIS METHOD: To customize how process() output is stored

This method provides a clean extension point for customizing how the return value from your process() method gets stored in step.results. The default behavior is to store the returned value directly.

Parameters:

  • step (Tasker::WorkflowStep)

    The current step

  • process_output (Object)

    The return value from process() method

  • initial_results (Object)

    The value of step.results before process() was called



157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/tasker/step_handler/base.rb', line 157

def process_results(step, process_output, initial_results)
  # If developer already set step.results in their process() method, respect it
  if step.results != initial_results
    log_structured(
      :debug,
      'Developer set custom results in process() method',
      step_id: step.workflow_step_id,
      step_name: step.name
    )
    return
  end

  # Default behavior: store the return value from process()
  step.results = process_output
end