Class: Tasker::Events::EventPayloadBuilder

Inherits:
Object
  • Object
show all
Defined in:
lib/tasker/events/event_payload_builder.rb

Overview

EventPayloadBuilder creates standardized event payloads for consistent telemetry

This class solves the payload standardization issues identified during factory migration:

  • Missing :execution_duration keys in step completion events

  • Missing :error_message keys in step failure events

  • Missing :attempt_number keys for retry tracking

  • Inconsistent payload structure across different event publishers

Usage:

payload = EventPayloadBuilder.build_step_payload(step, task, event_type: :completed)
Tasker::Events::Publisher.instance.publish('step.completed', payload)

Defined Under Namespace

Classes: ErrorInfoExtractor, StepPayloadBuilder

Class Method Summary collapse

Class Method Details

.build_orchestration_payload(event_type:, context: {}) ⇒ Hash

Build standardized payload for workflow orchestration events

Parameters:

  • event_type (Symbol)

    The orchestration event type

  • context (Hash) (defaults to: {})

    The orchestration context

Returns:

  • (Hash)

    Standardized orchestration payload



78
79
80
81
82
83
84
85
86
87
88
# File 'lib/tasker/events/event_payload_builder.rb', line 78

def build_orchestration_payload(event_type:, context: {})
  {
    # Event metadata
    event_type: event_type.to_s,
    orchestration_event: true,
    timestamp: Time.current.iso8601,

    # Merge provided context
    **context
  }
end

.build_step_payload(step, task = nil, event_type: :completed, additional_context: {}) ⇒ Hash

Build standardized payload for step events

Parameters:

  • step (WorkflowStep)

    The step object

  • task (Task) (defaults to: nil)

    The associated task

  • event_type (Symbol) (defaults to: :completed)

    The type of event (:started, :completed, :failed, :retry, etc.)

  • additional_context (Hash) (defaults to: {})

    Additional context to merge in

Returns:

  • (Hash)

    Standardized event payload



25
26
27
# File 'lib/tasker/events/event_payload_builder.rb', line 25

def build_step_payload(step, task = nil, event_type: :completed, additional_context: {})
  StepPayloadBuilder.build(step, task, event_type, additional_context)
end

.build_task_payload(task, event_type: :completed, additional_context: {}) ⇒ Hash

Build standardized payload for task events

Parameters:

  • task (Task)

    The task object

  • event_type (Symbol) (defaults to: :completed)

    The type of event (:started, :completed, :failed, etc.)

  • additional_context (Hash) (defaults to: {})

    Additional context to merge in

Returns:

  • (Hash)

    Standardized event payload



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/tasker/events/event_payload_builder.rb', line 35

def build_task_payload(task, event_type: :completed, additional_context: {})
  # Get all completion stats in a single optimized query
  stats = Tasker::WorkflowStep.task_completion_stats(task)

  base_payload = {
    # Core identifiers
    task_id: task.task_id,
    task_name: get_task_name(task),

    # Timing information - only set completed_at for fully completed tasks
    started_at: task.created_at&.iso8601,
    completed_at: stats[:all_complete] ? stats[:latest_completion_time]&.iso8601 : nil,

    # Task metadata
    task_type: task.class.name,
    event_type: event_type.to_s,
    timestamp: Time.current.iso8601
  }

  # Add duration and step statistics to all task events
  base_payload.merge!(build_task_timing_and_statistics(task, stats))

  # Add event-specific fields
  case event_type
  when :completed
    base_payload.merge!(build_task_completion_specific_payload(task))
  when :failed
    base_payload.merge!(build_task_failure_payload(task, additional_context))
  when :started
    base_payload.merge!(build_task_start_payload(task))
  end

  # Merge additional context, allowing overrides
  base_payload.merge!(additional_context)

  base_payload
end