Module: Temporalio::Workflow
- Defined in:
- lib/temporalio/workflow.rb,
lib/temporalio/workflow/info.rb,
lib/temporalio/workflow/future.rb,
lib/temporalio/workflow/definition.rb,
lib/temporalio/workflow/update_info.rb,
lib/temporalio/workflow/parent_close_policy.rb,
lib/temporalio/workflow/child_workflow_handle.rb,
lib/temporalio/workflow/external_workflow_handle.rb,
lib/temporalio/workflow/handler_unfinished_policy.rb,
lib/temporalio/workflow/activity_cancellation_type.rb,
lib/temporalio/workflow/child_workflow_cancellation_type.rb
Overview
Module with all class-methods that can be made from a workflow. Methods on this module cannot be used outside of a workflow with the obvious exception of Workflow.in_workflow?. This module is not meant to be included or mixed in.
Defined Under Namespace
Modules: ActivityCancellationType, ChildWorkflowCancellationType, HandlerUnfinishedPolicy, ParentClosePolicy, Unsafe Classes: ChildWorkflowHandle, ContinueAsNewError, Definition, ExternalWorkflowHandle, Future, Info, InvalidWorkflowStateError, NondeterminismError, UpdateInfo
Class Method Summary collapse
-
.all_handlers_finished? ⇒ Boolean
Whether all update and signal handlers have finished executing.
-
.cancellation ⇒ Cancellation
Cancellation for the workflow.
-
.continue_as_new_suggested ⇒ Boolean
Whether continue as new is suggested.
-
.current_history_length ⇒ Integer
Current number of events in history.
-
.current_history_size ⇒ Integer
Current history size in bytes.
-
.current_update_info ⇒ UpdateInfo
Current update info if this code is running inside an update.
-
.deprecate_patch(patch_id) ⇒ Object
Mark a patch as deprecated.
-
.execute_activity(activity, *args, task_queue: info.task_queue, schedule_to_close_timeout: nil, schedule_to_start_timeout: nil, start_to_close_timeout: nil, heartbeat_timeout: nil, retry_policy: nil, cancellation: Workflow.cancellation, cancellation_type: ActivityCancellationType::TRY_CANCEL, activity_id: nil, disable_eager_execution: false) ⇒ Object
Execute an activity and return its result.
-
.execute_child_workflow(workflow, *args, id: random.uuid, task_queue: info.task_queue, cancellation: Workflow.cancellation, cancellation_type: ChildWorkflowCancellationType::WAIT_CANCELLATION_COMPLETED, parent_close_policy: ParentClosePolicy::TERMINATE, execution_timeout: nil, run_timeout: nil, task_timeout: nil, id_reuse_policy: WorkflowIDReusePolicy::ALLOW_DUPLICATE, retry_policy: nil, cron_schedule: nil, memo: nil, search_attributes: nil) ⇒ Object
Shortcut for Workflow.start_child_workflow + ChildWorkflowHandle#result.
-
.execute_local_activity(activity, *args, schedule_to_close_timeout: nil, schedule_to_start_timeout: nil, start_to_close_timeout: nil, retry_policy: nil, local_retry_threshold: nil, cancellation: Workflow.cancellation, cancellation_type: ActivityCancellationType::TRY_CANCEL, activity_id: nil) ⇒ Object
Execute an activity locally in this same workflow task and return its result.
-
.external_workflow_handle(workflow_id, run_id: nil) ⇒ ExternalWorkflowHandle
Get a handle to an external workflow for canceling and issuing signals.
-
.in_workflow? ⇒ Boolean
Whether the current code is executing in a workflow.
-
.info ⇒ Info
Information about the current workflow.
-
.instance ⇒ Definition?
Workflow class instance.
-
.logger ⇒ Logger
Logger for the workflow.
-
.memo ⇒ Hash{String, Symbol => Object}
Memo for the workflow.
-
.metric_meter ⇒ Metric::Meter
Metric meter to create metrics on.
-
.now ⇒ Time
Current UTC time for this workflow.
-
.patched(patch_id) ⇒ Boolean
Patch a workflow.
-
.payload_converter ⇒ Converters::PayloadConverter
Payload converter for the workflow.
-
.query_handlers ⇒ Hash<String, Definition::Query>
Query handlers for this workflow.
-
.random ⇒ Random
Deterministic instance of Random for use in a workflow.
-
.search_attributes ⇒ SearchAttributes
Search attributes for the workflow.
-
.signal_handlers ⇒ Hash<String, Definition::Signal>
Signal handlers for this workflow.
-
.sleep(duration, summary: nil, cancellation: Workflow.cancellation) ⇒ Object
Sleep in a workflow for the given time.
-
.start_child_workflow(workflow, *args, id: random.uuid, task_queue: info.task_queue, cancellation: Workflow.cancellation, cancellation_type: ChildWorkflowCancellationType::WAIT_CANCELLATION_COMPLETED, parent_close_policy: ParentClosePolicy::TERMINATE, execution_timeout: nil, run_timeout: nil, task_timeout: nil, id_reuse_policy: WorkflowIDReusePolicy::ALLOW_DUPLICATE, retry_policy: nil, cron_schedule: nil, memo: nil, search_attributes: nil) ⇒ ChildWorkflowHandle
Start a child workflow and return the handle.
-
.timeout(duration, exception_class = Timeout::Error, message = 'execution expired', summary: 'Timeout timer') { ... } ⇒ Object
Run the block until the timeout is reached.
-
.update_handlers ⇒ Hash<String, Definition::Update>
Update handlers for this workflow.
-
.upsert_memo(hash) ⇒ Object
Issue updates to the workflow memo.
-
.upsert_search_attributes(*updates) ⇒ Object
Issue updates to the workflow search attributes.
-
.wait_condition(cancellation: Workflow.cancellation) { ... } ⇒ Object
Wait for the given block to return a “truthy” value (i.e. any value other than ‘false` or `nil`).
Class Method Details
.all_handlers_finished? ⇒ Boolean
Whether all update and signal handlers have finished executing. Consider waiting on this condition before workflow return or continue-as-new, to prevent interruption of in-progress handlers by workflow return: ‘Temporalio::Workflow.wait_condition { Temporalio::Workflow.all_handlers_finished? }“
24 25 26 |
# File 'lib/temporalio/workflow.rb', line 24 def self.all_handlers_finished? _current.all_handlers_finished? end |
.cancellation ⇒ Cancellation
Returns Cancellation for the workflow. This is canceled when a workflow cancellation request is received. This is the default cancellation for most workflow calls.
30 31 32 |
# File 'lib/temporalio/workflow.rb', line 30 def self.cancellation _current.cancellation end |
.continue_as_new_suggested ⇒ Boolean
Returns Whether continue as new is suggested. This value is the current continue-as-new suggestion up until the current task. Note, this value may not be up to date when accessed in a query. When continue as new is suggested is based on server-side configuration.
37 38 39 |
# File 'lib/temporalio/workflow.rb', line 37 def self.continue_as_new_suggested _current.continue_as_new_suggested end |
.current_history_length ⇒ Integer
Returns Current number of events in history. This value is the current history event count up until the current task. Note, this value may not be up to date when accessed in a query.
43 44 45 |
# File 'lib/temporalio/workflow.rb', line 43 def self.current_history_length _current.current_history_length end |
.current_history_size ⇒ Integer
Returns Current history size in bytes. This value is the current history size up until the current task. Note, this value may not be up to date when accessed in a query.
49 50 51 |
# File 'lib/temporalio/workflow.rb', line 49 def self.current_history_size _current.current_history_size end |
.current_update_info ⇒ UpdateInfo
Returns Current update info if this code is running inside an update. This is set via a Fiber-local storage so it is only visible to the current handler fiber.
55 56 57 |
# File 'lib/temporalio/workflow.rb', line 55 def self.current_update_info _current.current_update_info end |
.deprecate_patch(patch_id) ⇒ Object
Mark a patch as deprecated.
This marks a workflow that had patched in a previous version of the code as no longer applicable because all workflows that use the old code path are done and will never be queried again. Therefore the old code path is removed as well.
66 67 68 |
# File 'lib/temporalio/workflow.rb', line 66 def self.deprecate_patch(patch_id) _current.deprecate_patch(patch_id) end |
.execute_activity(activity, *args, task_queue: info.task_queue, schedule_to_close_timeout: nil, schedule_to_start_timeout: nil, start_to_close_timeout: nil, heartbeat_timeout: nil, retry_policy: nil, cancellation: Workflow.cancellation, cancellation_type: ActivityCancellationType::TRY_CANCEL, activity_id: nil, disable_eager_execution: false) ⇒ Object
Using an already-canceled cancellation may give a different exception than canceling after started. Use Error.canceled? to check if the exception is a cancellation either way.
Execute an activity and return its result. Either ‘start_to_close_timeout` or `schedule_to_close_timeout` must be set. The `heartbeat_timeout` should be set for any non-immediately-completing activity so it can receive cancellation. To run an activity in the background, use a Future.
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/temporalio/workflow.rb', line 106 def self.execute_activity( activity, *args, task_queue: info.task_queue, schedule_to_close_timeout: nil, schedule_to_start_timeout: nil, start_to_close_timeout: nil, heartbeat_timeout: nil, retry_policy: nil, cancellation: Workflow.cancellation, cancellation_type: ActivityCancellationType::TRY_CANCEL, activity_id: nil, disable_eager_execution: false ) _current.execute_activity( activity, *args, task_queue:, schedule_to_close_timeout:, schedule_to_start_timeout:, start_to_close_timeout:, heartbeat_timeout:, retry_policy:, cancellation:, cancellation_type:, activity_id:, disable_eager_execution: ) end |
.execute_child_workflow(workflow, *args, id: random.uuid, task_queue: info.task_queue, cancellation: Workflow.cancellation, cancellation_type: ChildWorkflowCancellationType::WAIT_CANCELLATION_COMPLETED, parent_close_policy: ParentClosePolicy::TERMINATE, execution_timeout: nil, run_timeout: nil, task_timeout: nil, id_reuse_policy: WorkflowIDReusePolicy::ALLOW_DUPLICATE, retry_policy: nil, cron_schedule: nil, memo: nil, search_attributes: nil) ⇒ Object
Shortcut for start_child_workflow + Temporalio::Workflow::ChildWorkflowHandle#result. See those two calls for more details.
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
# File 'lib/temporalio/workflow.rb', line 128 def self.execute_child_workflow( workflow, *args, id: random.uuid, task_queue: info.task_queue, cancellation: Workflow.cancellation, cancellation_type: ChildWorkflowCancellationType::WAIT_CANCELLATION_COMPLETED, parent_close_policy: ParentClosePolicy::TERMINATE, execution_timeout: nil, run_timeout: nil, task_timeout: nil, id_reuse_policy: WorkflowIDReusePolicy::ALLOW_DUPLICATE, retry_policy: nil, cron_schedule: nil, memo: nil, search_attributes: nil ) start_child_workflow( workflow, *args, id:, task_queue:, cancellation:, cancellation_type:, parent_close_policy:, execution_timeout:, run_timeout:, task_timeout:, id_reuse_policy:, retry_policy:, cron_schedule:, memo:, search_attributes: ).result end |
.execute_local_activity(activity, *args, schedule_to_close_timeout: nil, schedule_to_start_timeout: nil, start_to_close_timeout: nil, retry_policy: nil, local_retry_threshold: nil, cancellation: Workflow.cancellation, cancellation_type: ActivityCancellationType::TRY_CANCEL, activity_id: nil) ⇒ Object
Using an already-canceled cancellation may give a different exception than canceling after started. Use Error.canceled? to check if the exception is a cancellation either way.
Execute an activity locally in this same workflow task and return its result. This should usually only be used for short/simple activities where the result performance matters. Either ‘start_to_close_timeout` or `schedule_to_close_timeout` must be set. To run an activity in the background, use a Future.
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 |
# File 'lib/temporalio/workflow.rb', line 184 def self.execute_local_activity( activity, *args, schedule_to_close_timeout: nil, schedule_to_start_timeout: nil, start_to_close_timeout: nil, retry_policy: nil, local_retry_threshold: nil, cancellation: Workflow.cancellation, cancellation_type: ActivityCancellationType::TRY_CANCEL, activity_id: nil ) _current.execute_local_activity( activity, *args, schedule_to_close_timeout:, schedule_to_start_timeout:, start_to_close_timeout:, retry_policy:, local_retry_threshold:, cancellation:, cancellation_type:, activity_id: ) end |
.external_workflow_handle(workflow_id, run_id: nil) ⇒ ExternalWorkflowHandle
Get a handle to an external workflow for canceling and issuing signals.
209 210 211 |
# File 'lib/temporalio/workflow.rb', line 209 def self.external_workflow_handle(workflow_id, run_id: nil) _current.external_workflow_handle(workflow_id, run_id:) end |
.in_workflow? ⇒ Boolean
Returns Whether the current code is executing in a workflow.
214 215 216 |
# File 'lib/temporalio/workflow.rb', line 214 def self.in_workflow? _current_or_nil != nil end |
.info ⇒ Info
Returns Information about the current workflow.
219 220 221 |
# File 'lib/temporalio/workflow.rb', line 219 def self.info _current.info end |
.instance ⇒ Definition?
Returns Workflow class instance. This should always be present except in Temporalio::Worker::Interceptor::Workflow::Inbound#init where it will be nil.
225 226 227 |
# File 'lib/temporalio/workflow.rb', line 225 def self.instance _current.instance end |
.logger ⇒ Logger
Returns Logger for the workflow. This is a scoped logger that automatically appends workflow details to every log and takes care not to log during replay.
231 232 233 |
# File 'lib/temporalio/workflow.rb', line 231 def self.logger _current.logger end |
.memo ⇒ Hash{String, Symbol => Object}
Returns Memo for the workflow. This is a read-only view of the memo. To update the memo, use upsert_memo. This always returns the same instance and updates are reflected on the returned instance, so it is not technically frozen.
238 239 240 |
# File 'lib/temporalio/workflow.rb', line 238 def self.memo _current.memo end |
.metric_meter ⇒ Metric::Meter
Returns Metric meter to create metrics on. This metric meter already contains some workflow-specific attributes and takes care not to apply metrics during replay.
244 245 246 |
# File 'lib/temporalio/workflow.rb', line 244 def self.metric_meter _current.metric_meter end |
.now ⇒ Time
Returns Current UTC time for this workflow. This creates and returns a new Time instance every time it is invoked, it is not the same instance continually mutated.
250 251 252 |
# File 'lib/temporalio/workflow.rb', line 250 def self.now _current.now end |
.patched(patch_id) ⇒ Boolean
Patch a workflow.
When called, this will only return true if code should take the newer path which means this is either not replaying or is replaying and has seen this patch before. Results for successive calls to this function for the same ID and workflow are memoized. Use deprecate_patch when all workflows are done and will never be queried again. The old code path can be removed at that time too.
263 264 265 |
# File 'lib/temporalio/workflow.rb', line 263 def self.patched(patch_id) _current.patched(patch_id) end |
.payload_converter ⇒ Converters::PayloadConverter
Returns Payload converter for the workflow.
268 269 270 |
# File 'lib/temporalio/workflow.rb', line 268 def self.payload_converter _current.payload_converter end |
.query_handlers ⇒ Hash<String, Definition::Query>
Returns Query handlers for this workflow. This hash is mostly immutable except for ‘[]=` (and `store`) which can be used to set a new handler, or can be set with `nil` to remove a handler. For most use cases, defining a handler as a `workflow_query` method is best.
275 276 277 |
# File 'lib/temporalio/workflow.rb', line 275 def self.query_handlers _current.query_handlers end |
.random ⇒ Random
Returns Deterministic instance of Random for use in a workflow. This instance should be accessed each time needed, not stored. This instance may be recreated with a different seed in special cases (e.g. workflow reset). Do not use any other randomization inside workflow code.
282 283 284 |
# File 'lib/temporalio/workflow.rb', line 282 def self.random _current.random end |
.search_attributes ⇒ SearchAttributes
Returns Search attributes for the workflow. This is a read-only view of the attributes. To update the attributes, use upsert_search_attributes. This always returns the same instance and updates are reflected on the returned instance, so it is not technically frozen.
289 290 291 |
# File 'lib/temporalio/workflow.rb', line 289 def self.search_attributes _current.search_attributes end |
.signal_handlers ⇒ Hash<String, Definition::Signal>
Returns Signal handlers for this workflow. This hash is mostly immutable except for ‘[]=` (and `store`) which can be used to set a new handler, or can be set with `nil` to remove a handler. For most use cases, defining a handler as a `workflow_signal` method is best.
296 297 298 |
# File 'lib/temporalio/workflow.rb', line 296 def self.signal_handlers _current.signal_handlers end |
.sleep(duration, summary: nil, cancellation: Workflow.cancellation) ⇒ Object
Sleep in a workflow for the given time.
310 311 312 |
# File 'lib/temporalio/workflow.rb', line 310 def self.sleep(duration, summary: nil, cancellation: Workflow.cancellation) _current.sleep(duration, summary:, cancellation:) end |
.start_child_workflow(workflow, *args, id: random.uuid, task_queue: info.task_queue, cancellation: Workflow.cancellation, cancellation_type: ChildWorkflowCancellationType::WAIT_CANCELLATION_COMPLETED, parent_close_policy: ParentClosePolicy::TERMINATE, execution_timeout: nil, run_timeout: nil, task_timeout: nil, id_reuse_policy: WorkflowIDReusePolicy::ALLOW_DUPLICATE, retry_policy: nil, cron_schedule: nil, memo: nil, search_attributes: nil) ⇒ ChildWorkflowHandle
Start a child workflow and return the handle.
337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 |
# File 'lib/temporalio/workflow.rb', line 337 def self.start_child_workflow( workflow, *args, id: random.uuid, task_queue: info.task_queue, cancellation: Workflow.cancellation, cancellation_type: ChildWorkflowCancellationType::WAIT_CANCELLATION_COMPLETED, parent_close_policy: ParentClosePolicy::TERMINATE, execution_timeout: nil, run_timeout: nil, task_timeout: nil, id_reuse_policy: WorkflowIDReusePolicy::ALLOW_DUPLICATE, retry_policy: nil, cron_schedule: nil, memo: nil, search_attributes: nil ) _current.start_child_workflow( workflow, *args, id:, task_queue:, cancellation:, cancellation_type:, parent_close_policy:, execution_timeout:, run_timeout:, task_timeout:, id_reuse_policy:, retry_policy:, cron_schedule:, memo:, search_attributes: ) end |
.timeout(duration, exception_class = Timeout::Error, message = 'execution expired', summary: 'Timeout timer') { ... } ⇒ Object
Run the block until the timeout is reached. This is backed by sleep. This does not accept cancellation because it is expected the block within will properly handle/bubble cancellation.
376 377 378 379 380 381 382 383 384 |
# File 'lib/temporalio/workflow.rb', line 376 def self.timeout( duration, exception_class = Timeout::Error, = 'execution expired', summary: 'Timeout timer', & ) _current.timeout(duration, exception_class, , summary:, &) end |
.update_handlers ⇒ Hash<String, Definition::Update>
Returns Update handlers for this workflow. This hash is mostly immutable except for ‘[]=` (and `store`) which can be used to set a new handler, or can be set with `nil` to remove a handler. For most use cases, defining a handler as a `workflow_update` method is best.
389 390 391 |
# File 'lib/temporalio/workflow.rb', line 389 def self.update_handlers _current.update_handlers end |
.upsert_memo(hash) ⇒ Object
Issue updates to the workflow memo.
397 398 399 |
# File 'lib/temporalio/workflow.rb', line 397 def self.upsert_memo(hash) _current.upsert_memo(hash) end |
.upsert_search_attributes(*updates) ⇒ Object
Issue updates to the workflow search attributes.
405 406 407 |
# File 'lib/temporalio/workflow.rb', line 405 def self.upsert_search_attributes(*updates) _current.upsert_search_attributes(*updates) end |
.wait_condition(cancellation: Workflow.cancellation) { ... } ⇒ Object
Wait for the given block to return a “truthy” value (i.e. any value other than ‘false` or `nil`). The block must be side-effect free since it may be invoked frequently during event loop iteration. To timeout a wait, timeout can be used. This cannot be used in side-effect-free contexts such as `initialize`, queries, or update validators.
This is very commonly used to wait on a value to be set by a handler, e.g. ‘Temporalio::Workflow.wait_condition { @some_value }`. Special care was taken to only wake up a single wait condition when it evaluates to true. Therefore if multiple wait conditions are waiting on the same thing, only one is awoken at a time, which means the code immediately following that wait condition can change the variable before other wait conditions are evaluated. This is a useful property for building mutexes/semaphores.
426 427 428 429 430 |
# File 'lib/temporalio/workflow.rb', line 426 def self.wait_condition(cancellation: Workflow.cancellation, &) raise 'Block required' unless block_given? _current.wait_condition(cancellation:, &) end |