Class: SolidFlow::Task
- Inherits:
-
Object
- Object
- SolidFlow::Task
- Defined in:
- lib/solid_flow/task.rb
Overview
Base class for all stateless tasks executed out-of-band from the workflow process.
Defined Under Namespace
Classes: TaskContext
Constant Summary collapse
- THREAD_KEY =
:__solidflow_task_context__
Class Attribute Summary collapse
-
.queue_name ⇒ Object
readonly
Returns the value of attribute queue_name.
-
.retry_config ⇒ Object
readonly
Returns the value of attribute retry_config.
-
.task_symbol ⇒ Object
readonly
Returns the value of attribute task_symbol.
-
.timeout_config ⇒ Object
readonly
Returns the value of attribute timeout_config.
Class Method Summary collapse
- .current_context ⇒ Object
- .default_task_symbol ⇒ Object
- .execute(arguments:, headers:) ⇒ Object
- .inherited(subclass) ⇒ Object
- .queue(name = nil) ⇒ Object
- .register_as(name) ⇒ Object
- .retry(options = nil) ⇒ Object
- .timeout(value = nil) ⇒ Object
Instance Method Summary collapse
- #current_attempt ⇒ Object
- #current_context ⇒ Object
- #current_execution_id ⇒ Object
- #current_idempotency_key ⇒ Object
- #current_step_name ⇒ Object
- #perform ⇒ Object
Class Attribute Details
.queue_name ⇒ Object (readonly)
Returns the value of attribute queue_name.
12 13 14 |
# File 'lib/solid_flow/task.rb', line 12 def queue_name @queue_name end |
.retry_config ⇒ Object (readonly)
Returns the value of attribute retry_config.
12 13 14 |
# File 'lib/solid_flow/task.rb', line 12 def retry_config @retry_config end |
.task_symbol ⇒ Object (readonly)
Returns the value of attribute task_symbol.
12 13 14 |
# File 'lib/solid_flow/task.rb', line 12 def task_symbol @task_symbol end |
.timeout_config ⇒ Object (readonly)
Returns the value of attribute timeout_config.
12 13 14 |
# File 'lib/solid_flow/task.rb', line 12 def timeout_config @timeout_config end |
Class Method Details
.current_context ⇒ Object
100 101 102 |
# File 'lib/solid_flow/task.rb', line 100 def current_context Thread.current[THREAD_KEY] end |
.default_task_symbol ⇒ Object
23 24 25 |
# File 'lib/solid_flow/task.rb', line 23 def default_task_symbol name.demodulize.underscore.to_sym end |
.execute(arguments:, headers:) ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/solid_flow/task.rb', line 56 def execute(arguments:, headers:) context = TaskContext.new( execution_id: headers[:execution_id], step_name: headers[:step_name], attempt: headers[:attempt], idempotency_key: headers[:idempotency_key], workflow_name: headers[:workflow_name], metadata: headers[:metadata] || {} ) SolidFlow.instrument( "solidflow.task.started", task: task_symbol, execution_id: context.execution_id, step_name: context.step_name, attempt: context.attempt ) result = with_task_context(context) do new.perform(**arguments.symbolize_keys) end SolidFlow.instrument( "solidflow.task.completed", task: task_symbol, execution_id: context.execution_id, step_name: context.step_name, attempt: context.attempt, result: ) result rescue StandardError => e SolidFlow.instrument( "solidflow.task.failed", task: task_symbol, execution_id: context.execution_id, step_name: context.step_name, attempt: context.attempt, error: e ) raise Errors::TaskFailure.new(e., details: { class: e.class.name, backtrace: e.backtrace }) end |
.inherited(subclass) ⇒ Object
14 15 16 17 18 19 20 21 |
# File 'lib/solid_flow/task.rb', line 14 def inherited(subclass) super subclass.instance_variable_set(:@timeout_config, timeout_config&.dup) subclass.instance_variable_set(:@retry_config, retry_config&.dup) subclass.instance_variable_set(:@queue_name, queue_name) subclass.instance_variable_set(:@task_symbol, subclass.default_task_symbol) SolidFlow.task_registry.register(subclass.default_task_symbol, subclass) end |
.queue(name = nil) ⇒ Object
48 49 50 51 52 53 54 |
# File 'lib/solid_flow/task.rb', line 48 def queue(name = nil) if name @queue_name = name else @queue_name || SolidFlow.configuration.default_task_queue end end |
.register_as(name) ⇒ Object
27 28 29 30 |
# File 'lib/solid_flow/task.rb', line 27 def register_as(name) @task_symbol = name.to_sym SolidFlow.task_registry.register(@task_symbol, self) end |
.retry(options = nil) ⇒ Object
40 41 42 43 44 45 46 |
# File 'lib/solid_flow/task.rb', line 40 def retry( = nil) if @retry_config = .deep_symbolize_keys else @retry_config || {} end end |
.timeout(value = nil) ⇒ Object
32 33 34 35 36 37 38 |
# File 'lib/solid_flow/task.rb', line 32 def timeout(value = nil) if value @timeout_config = value else @timeout_config end end |
Instance Method Details
#current_attempt ⇒ Object
144 145 146 |
# File 'lib/solid_flow/task.rb', line 144 def current_attempt current_context&.attempt end |
#current_context ⇒ Object
132 133 134 |
# File 'lib/solid_flow/task.rb', line 132 def current_context self.class.current_context end |
#current_execution_id ⇒ Object
136 137 138 |
# File 'lib/solid_flow/task.rb', line 136 def current_execution_id current_context&.execution_id end |
#current_idempotency_key ⇒ Object
148 149 150 |
# File 'lib/solid_flow/task.rb', line 148 def current_idempotency_key current_context&.idempotency_key end |
#current_step_name ⇒ Object
140 141 142 |
# File 'lib/solid_flow/task.rb', line 140 def current_step_name current_context&.step_name end |
#perform ⇒ Object
128 129 130 |
# File 'lib/solid_flow/task.rb', line 128 def perform(**) raise NotImplementedError, "Tasks must implement #perform" end |