Class: SolidFlow::Task

Inherits:
Object
  • Object
show all
Defined in:
lib/solid_flow/task.rb

Overview

Base class for all stateless tasks executed out-of-band from the workflow process.

Defined Under Namespace

Classes: TaskContext

Constant Summary collapse

THREAD_KEY =
:__solidflow_task_context__

Class Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Class Attribute Details

.queue_nameObject (readonly)

Returns the value of attribute queue_name.



12
13
14
# File 'lib/solid_flow/task.rb', line 12

def queue_name
  @queue_name
end

.retry_configObject (readonly)

Returns the value of attribute retry_config.



12
13
14
# File 'lib/solid_flow/task.rb', line 12

def retry_config
  @retry_config
end

.task_symbolObject (readonly)

Returns the value of attribute task_symbol.



12
13
14
# File 'lib/solid_flow/task.rb', line 12

def task_symbol
  @task_symbol
end

.timeout_configObject (readonly)

Returns the value of attribute timeout_config.



12
13
14
# File 'lib/solid_flow/task.rb', line 12

def timeout_config
  @timeout_config
end

Class Method Details

.current_contextObject



100
101
102
# File 'lib/solid_flow/task.rb', line 100

def current_context
  Thread.current[THREAD_KEY]
end

.default_task_symbolObject



23
24
25
# File 'lib/solid_flow/task.rb', line 23

def default_task_symbol
  name.demodulize.underscore.to_sym
end

.execute(arguments:, headers:) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/solid_flow/task.rb', line 56

def execute(arguments:, headers:)
  context = TaskContext.new(
    execution_id: headers[:execution_id],
    step_name: headers[:step_name],
    attempt: headers[:attempt],
    idempotency_key: headers[:idempotency_key],
    workflow_name: headers[:workflow_name],
    metadata: headers[:metadata] || {}
  )

  SolidFlow.instrument(
    "solidflow.task.started",
    task: task_symbol,
    execution_id: context.execution_id,
    step_name: context.step_name,
    attempt: context.attempt
  )

  result = with_task_context(context) do
    new.perform(**arguments.symbolize_keys)
  end

  SolidFlow.instrument(
    "solidflow.task.completed",
    task: task_symbol,
    execution_id: context.execution_id,
    step_name: context.step_name,
    attempt: context.attempt,
    result:
  )

  result
rescue StandardError => e
  SolidFlow.instrument(
    "solidflow.task.failed",
    task: task_symbol,
    execution_id: context.execution_id,
    step_name: context.step_name,
    attempt: context.attempt,
    error: e
  )
  raise Errors::TaskFailure.new(e.message, details: { class: e.class.name, backtrace: e.backtrace })
end

.inherited(subclass) ⇒ Object



14
15
16
17
18
19
20
21
# File 'lib/solid_flow/task.rb', line 14

def inherited(subclass)
  super
  subclass.instance_variable_set(:@timeout_config, timeout_config&.dup)
  subclass.instance_variable_set(:@retry_config, retry_config&.dup)
  subclass.instance_variable_set(:@queue_name, queue_name)
  subclass.instance_variable_set(:@task_symbol, subclass.default_task_symbol)
  SolidFlow.task_registry.register(subclass.default_task_symbol, subclass)
end

.queue(name = nil) ⇒ Object



48
49
50
51
52
53
54
# File 'lib/solid_flow/task.rb', line 48

def queue(name = nil)
  if name
    @queue_name = name
  else
    @queue_name || SolidFlow.configuration.default_task_queue
  end
end

.register_as(name) ⇒ Object



27
28
29
30
# File 'lib/solid_flow/task.rb', line 27

def register_as(name)
  @task_symbol = name.to_sym
  SolidFlow.task_registry.register(@task_symbol, self)
end

.retry(options = nil) ⇒ Object



40
41
42
43
44
45
46
# File 'lib/solid_flow/task.rb', line 40

def retry(options = nil)
  if options
    @retry_config = options.deep_symbolize_keys
  else
    @retry_config || {}
  end
end

.timeout(value = nil) ⇒ Object



32
33
34
35
36
37
38
# File 'lib/solid_flow/task.rb', line 32

def timeout(value = nil)
  if value
    @timeout_config = value
  else
    @timeout_config
  end
end

Instance Method Details

#current_attemptObject



144
145
146
# File 'lib/solid_flow/task.rb', line 144

def current_attempt
  current_context&.attempt
end

#current_contextObject



132
133
134
# File 'lib/solid_flow/task.rb', line 132

def current_context
  self.class.current_context
end

#current_execution_idObject



136
137
138
# File 'lib/solid_flow/task.rb', line 136

def current_execution_id
  current_context&.execution_id
end

#current_idempotency_keyObject



148
149
150
# File 'lib/solid_flow/task.rb', line 148

def current_idempotency_key
  current_context&.idempotency_key
end

#current_step_nameObject



140
141
142
# File 'lib/solid_flow/task.rb', line 140

def current_step_name
  current_context&.step_name
end

#performObject

Raises:

  • (NotImplementedError)


128
129
130
# File 'lib/solid_flow/task.rb', line 128

def perform(**)
  raise NotImplementedError, "Tasks must implement #perform"
end