Class: Temporalio::Internal::Worker::WorkflowInstance::Context
- Inherits:
-
Object
- Object
- Temporalio::Internal::Worker::WorkflowInstance::Context
- Defined in:
- lib/temporalio/internal/worker/workflow_instance/context.rb
Overview
Context for all workflow calls. All calls in the Workflow class should call a method on this class and then this class can delegate the call as needed to other parts of the workflow instance system.
Instance Method Summary collapse
- #_cancel_external_workflow(id:, run_id:) ⇒ Object
- #_outbound=(outbound) ⇒ Object
- #_signal_child_workflow(id:, signal:, args:, cancellation:, arg_hints:) ⇒ Object
- #_signal_external_workflow(id:, run_id:, signal:, args:, cancellation:, arg_hints:) ⇒ Object
- #all_handlers_finished? ⇒ Boolean
- #cancellation ⇒ Object
- #continue_as_new_suggested ⇒ Object
- #create_nexus_client(endpoint:, service:) ⇒ Object
- #current_deployment_version ⇒ Object
- #current_details ⇒ Object
- #current_details=(details) ⇒ Object
- #current_history_length ⇒ Object
- #current_history_size ⇒ Object
- #current_update_info ⇒ Object
- #deprecate_patch(patch_id) ⇒ Object
- #durable_scheduler_disabled ⇒ Object
- #execute_activity(activity, *args, task_queue:, summary:, schedule_to_close_timeout:, schedule_to_start_timeout:, start_to_close_timeout:, heartbeat_timeout:, retry_policy:, cancellation:, cancellation_type:, activity_id:, disable_eager_execution:, priority:, arg_hints:, result_hint:) ⇒ Object
- #execute_local_activity(activity, *args, summary:, schedule_to_close_timeout:, schedule_to_start_timeout:, start_to_close_timeout:, retry_policy:, local_retry_threshold:, cancellation:, cancellation_type:, activity_id:, arg_hints:, result_hint:) ⇒ Object
- #external_workflow_handle(workflow_id, run_id: nil) ⇒ Object
- #illegal_call_tracing_disabled ⇒ Object
- #info ⇒ Object
-
#initialize(instance) ⇒ Context
constructor
A new instance of Context.
- #initialize_continue_as_new_error(error) ⇒ Object
- #instance ⇒ Object
- #io_enabled ⇒ Object
- #logger ⇒ Object
- #memo ⇒ Object
- #metric_meter ⇒ Object
- #now ⇒ Object
- #patched(patch_id) ⇒ Object
- #payload_converter ⇒ Object
- #query_handlers ⇒ Object
- #random ⇒ Object
- #replaying? ⇒ Boolean
- #replaying_history_events? ⇒ Boolean
- #search_attributes ⇒ Object
- #signal_handlers ⇒ Object
- #sleep(duration, summary:, cancellation:) ⇒ Object
- #start_child_workflow(workflow, *args, id:, task_queue:, static_summary:, static_details:, cancellation:, cancellation_type:, parent_close_policy:, execution_timeout:, run_timeout:, task_timeout:, id_reuse_policy:, retry_policy:, cron_schedule:, memo:, search_attributes:, priority:, arg_hints:, result_hint:) ⇒ Object
- #storage ⇒ Object
- #timeout(duration, exception_class, *exception_args, summary:) ⇒ Object
- #update_handlers ⇒ Object
- #upsert_memo(hash) ⇒ Object
- #upsert_search_attributes(*updates) ⇒ Object
- #wait_condition(cancellation:) ⇒ Object
Constructor Details
#initialize(instance) ⇒ Context
Returns a new instance of Context.
20 21 22 |
# File 'lib/temporalio/internal/worker/workflow_instance/context.rb', line 20 def initialize(instance) @instance = instance end |
Instance Method Details
#_cancel_external_workflow(id:, run_id:) ⇒ Object
394 395 396 397 398 |
# File 'lib/temporalio/internal/worker/workflow_instance/context.rb', line 394 def _cancel_external_workflow(id:, run_id:) @outbound.cancel_external_workflow( Temporalio::Worker::Interceptor::Workflow::CancelExternalWorkflowInput.new(id:, run_id:) ) end |
#_outbound=(outbound) ⇒ Object
400 401 402 |
# File 'lib/temporalio/internal/worker/workflow_instance/context.rb', line 400 def _outbound=(outbound) @outbound = outbound end |
#_signal_child_workflow(id:, signal:, args:, cancellation:, arg_hints:) ⇒ Object
404 405 406 407 408 409 410 411 412 413 414 415 416 |
# File 'lib/temporalio/internal/worker/workflow_instance/context.rb', line 404 def _signal_child_workflow(id:, signal:, args:, cancellation:, arg_hints:) signal, defn_arg_hints = Workflow::Definition::Signal._name_and_hints_from_parameter(signal) @outbound.signal_child_workflow( Temporalio::Worker::Interceptor::Workflow::SignalChildWorkflowInput.new( id:, signal:, args:, cancellation:, arg_hints: arg_hints || defn_arg_hints, headers: {} ) ) end |
#_signal_external_workflow(id:, run_id:, signal:, args:, cancellation:, arg_hints:) ⇒ Object
418 419 420 421 422 423 424 425 426 427 428 429 430 431 |
# File 'lib/temporalio/internal/worker/workflow_instance/context.rb', line 418 def _signal_external_workflow(id:, run_id:, signal:, args:, cancellation:, arg_hints:) signal, defn_arg_hints = Workflow::Definition::Signal._name_and_hints_from_parameter(signal) @outbound.signal_external_workflow( Temporalio::Worker::Interceptor::Workflow::SignalExternalWorkflowInput.new( id:, run_id:, signal:, args:, cancellation:, arg_hints: arg_hints || defn_arg_hints, headers: {} ) ) end |
#all_handlers_finished? ⇒ Boolean
24 25 26 |
# File 'lib/temporalio/internal/worker/workflow_instance/context.rb', line 24 def all_handlers_finished? @instance.in_progress_handlers.empty? end |
#cancellation ⇒ Object
28 29 30 |
# File 'lib/temporalio/internal/worker/workflow_instance/context.rb', line 28 def cancellation @instance.cancellation end |
#continue_as_new_suggested ⇒ Object
32 33 34 |
# File 'lib/temporalio/internal/worker/workflow_instance/context.rb', line 32 def continue_as_new_suggested @instance.continue_as_new_suggested end |
#create_nexus_client(endpoint:, service:) ⇒ Object
36 37 38 |
# File 'lib/temporalio/internal/worker/workflow_instance/context.rb', line 36 def create_nexus_client(endpoint:, service:) NexusClient.new(endpoint:, service:, outbound: @outbound) end |
#current_deployment_version ⇒ Object
50 51 52 |
# File 'lib/temporalio/internal/worker/workflow_instance/context.rb', line 50 def current_deployment_version @instance.current_deployment_version end |
#current_details ⇒ Object
40 41 42 |
# File 'lib/temporalio/internal/worker/workflow_instance/context.rb', line 40 def current_details @instance.current_details || '' end |
#current_details=(details) ⇒ Object
44 45 46 47 48 |
# File 'lib/temporalio/internal/worker/workflow_instance/context.rb', line 44 def current_details=(details) raise 'Details must be a String' unless details.nil? || details.is_a?(String) @instance.current_details = (details || '') end |
#current_history_length ⇒ Object
54 55 56 |
# File 'lib/temporalio/internal/worker/workflow_instance/context.rb', line 54 def current_history_length @instance.current_history_length end |
#current_history_size ⇒ Object
58 59 60 |
# File 'lib/temporalio/internal/worker/workflow_instance/context.rb', line 58 def current_history_size @instance.current_history_size end |
#current_update_info ⇒ Object
62 63 64 |
# File 'lib/temporalio/internal/worker/workflow_instance/context.rb', line 62 def current_update_info Fiber[:__temporal_update_info] end |
#deprecate_patch(patch_id) ⇒ Object
66 67 68 |
# File 'lib/temporalio/internal/worker/workflow_instance/context.rb', line 66 def deprecate_patch(patch_id) @instance.patch(patch_id:, deprecated: true) end |
#durable_scheduler_disabled ⇒ Object
70 71 72 73 74 75 76 77 78 79 |
# File 'lib/temporalio/internal/worker/workflow_instance/context.rb', line 70 def durable_scheduler_disabled(&) prev = Fiber.scheduler # Imply illegal call tracing disabled illegal_call_tracing_disabled do Fiber.set_scheduler(nil) yield ensure Fiber.set_scheduler(prev) end end |
#execute_activity(activity, *args, task_queue:, summary:, schedule_to_close_timeout:, schedule_to_start_timeout:, start_to_close_timeout:, heartbeat_timeout:, retry_policy:, cancellation:, cancellation_type:, activity_id:, disable_eager_execution:, priority:, arg_hints:, result_hint:) ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/temporalio/internal/worker/workflow_instance/context.rb', line 81 def execute_activity( activity, *args, task_queue:, summary:, schedule_to_close_timeout:, schedule_to_start_timeout:, start_to_close_timeout:, heartbeat_timeout:, retry_policy:, cancellation:, cancellation_type:, activity_id:, disable_eager_execution:, priority:, arg_hints:, result_hint: ) activity, defn_arg_hints, defn_result_hint = case activity when Class defn = Activity::Definition::Info.from_activity(activity) [defn.name&.to_s, defn.arg_hints, defn.result_hint] when Symbol, String [activity.to_s, nil, nil] else raise ArgumentError, 'Activity must be a definition class, or a symbol/string' end raise 'Cannot invoke dynamic activities' unless activity @outbound.execute_activity( Temporalio::Worker::Interceptor::Workflow::ExecuteActivityInput.new( activity:, args:, task_queue: task_queue || info.task_queue, summary:, schedule_to_close_timeout:, schedule_to_start_timeout:, start_to_close_timeout:, heartbeat_timeout:, retry_policy:, cancellation:, cancellation_type:, activity_id:, disable_eager_execution: disable_eager_execution || @instance.disable_eager_activity_execution, priority:, arg_hints: arg_hints || defn_arg_hints, result_hint: result_hint || defn_result_hint, headers: {} ) ) end |
#execute_local_activity(activity, *args, summary:, schedule_to_close_timeout:, schedule_to_start_timeout:, start_to_close_timeout:, retry_policy:, local_retry_threshold:, cancellation:, cancellation_type:, activity_id:, arg_hints:, result_hint:) ⇒ Object
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/temporalio/internal/worker/workflow_instance/context.rb', line 135 def execute_local_activity( activity, *args, summary:, schedule_to_close_timeout:, schedule_to_start_timeout:, start_to_close_timeout:, retry_policy:, local_retry_threshold:, cancellation:, cancellation_type:, activity_id:, arg_hints:, result_hint: ) activity, defn_arg_hints, defn_result_hint = case activity when Class defn = Activity::Definition::Info.from_activity(activity) [defn.name&.to_s, defn.arg_hints, defn.result_hint] when Symbol, String [activity.to_s, nil, nil] else raise ArgumentError, 'Activity must be a definition class, or a symbol/string' end raise 'Cannot invoke dynamic activities' unless activity @outbound.execute_local_activity( Temporalio::Worker::Interceptor::Workflow::ExecuteLocalActivityInput.new( activity:, args:, summary:, schedule_to_close_timeout:, schedule_to_start_timeout:, start_to_close_timeout:, retry_policy:, local_retry_threshold:, cancellation:, cancellation_type:, activity_id:, arg_hints: arg_hints || defn_arg_hints, result_hint: result_hint || defn_result_hint, headers: {} ) ) end |
#external_workflow_handle(workflow_id, run_id: nil) ⇒ Object
182 183 184 |
# File 'lib/temporalio/internal/worker/workflow_instance/context.rb', line 182 def external_workflow_handle(workflow_id, run_id: nil) ExternalWorkflowHandle.new(id: workflow_id, run_id:, instance: @instance) end |
#illegal_call_tracing_disabled ⇒ Object
186 187 188 |
# File 'lib/temporalio/internal/worker/workflow_instance/context.rb', line 186 def illegal_call_tracing_disabled(&) @instance.illegal_call_tracing_disabled(&) end |
#info ⇒ Object
190 191 192 |
# File 'lib/temporalio/internal/worker/workflow_instance/context.rb', line 190 def info @instance.info end |
#initialize_continue_as_new_error(error) ⇒ Object
198 199 200 201 202 |
# File 'lib/temporalio/internal/worker/workflow_instance/context.rb', line 198 def initialize_continue_as_new_error(error) @outbound.initialize_continue_as_new_error( Temporalio::Worker::Interceptor::Workflow::InitializeContinueAsNewErrorInput.new(error:) ) end |
#instance ⇒ Object
194 195 196 |
# File 'lib/temporalio/internal/worker/workflow_instance/context.rb', line 194 def instance @instance.instance end |
#io_enabled ⇒ Object
204 205 206 207 208 209 210 211 212 |
# File 'lib/temporalio/internal/worker/workflow_instance/context.rb', line 204 def io_enabled(&) prev = @instance.io_enabled @instance.io_enabled = true begin yield ensure @instance.io_enabled = prev end end |
#logger ⇒ Object
214 215 216 |
# File 'lib/temporalio/internal/worker/workflow_instance/context.rb', line 214 def logger @instance.logger end |
#memo ⇒ Object
218 219 220 |
# File 'lib/temporalio/internal/worker/workflow_instance/context.rb', line 218 def memo @instance.memo end |
#metric_meter ⇒ Object
222 223 224 |
# File 'lib/temporalio/internal/worker/workflow_instance/context.rb', line 222 def metric_meter @instance.metric_meter end |
#now ⇒ Object
226 227 228 |
# File 'lib/temporalio/internal/worker/workflow_instance/context.rb', line 226 def now @instance.now end |
#patched(patch_id) ⇒ Object
230 231 232 |
# File 'lib/temporalio/internal/worker/workflow_instance/context.rb', line 230 def patched(patch_id) @instance.patch(patch_id:, deprecated: false) end |
#payload_converter ⇒ Object
234 235 236 |
# File 'lib/temporalio/internal/worker/workflow_instance/context.rb', line 234 def payload_converter @instance.payload_converter end |
#query_handlers ⇒ Object
238 239 240 |
# File 'lib/temporalio/internal/worker/workflow_instance/context.rb', line 238 def query_handlers @instance.query_handlers end |
#random ⇒ Object
242 243 244 |
# File 'lib/temporalio/internal/worker/workflow_instance/context.rb', line 242 def random @instance.random end |
#replaying? ⇒ Boolean
246 247 248 |
# File 'lib/temporalio/internal/worker/workflow_instance/context.rb', line 246 def @instance. end |
#replaying_history_events? ⇒ Boolean
250 251 252 |
# File 'lib/temporalio/internal/worker/workflow_instance/context.rb', line 250 def @instance. && !@instance.in_query_or_validator end |
#search_attributes ⇒ Object
254 255 256 |
# File 'lib/temporalio/internal/worker/workflow_instance/context.rb', line 254 def search_attributes @instance.search_attributes end |
#signal_handlers ⇒ Object
258 259 260 |
# File 'lib/temporalio/internal/worker/workflow_instance/context.rb', line 258 def signal_handlers @instance.signal_handlers end |
#sleep(duration, summary:, cancellation:) ⇒ Object
262 263 264 265 266 267 268 269 270 |
# File 'lib/temporalio/internal/worker/workflow_instance/context.rb', line 262 def sleep(duration, summary:, cancellation:) @outbound.sleep( Temporalio::Worker::Interceptor::Workflow::SleepInput.new( duration:, summary:, cancellation: ) ) end |
#start_child_workflow(workflow, *args, id:, task_queue:, static_summary:, static_details:, cancellation:, cancellation_type:, parent_close_policy:, execution_timeout:, run_timeout:, task_timeout:, id_reuse_policy:, retry_policy:, cron_schedule:, memo:, search_attributes:, priority:, arg_hints:, result_hint:) ⇒ Object
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 |
# File 'lib/temporalio/internal/worker/workflow_instance/context.rb', line 272 def start_child_workflow( workflow, *args, id:, task_queue:, static_summary:, static_details:, cancellation:, cancellation_type:, parent_close_policy:, execution_timeout:, run_timeout:, task_timeout:, id_reuse_policy:, retry_policy:, cron_schedule:, memo:, search_attributes:, priority:, arg_hints:, result_hint: ) workflow, defn_arg_hints, defn_result_hint = Workflow::Definition._workflow_type_and_hints_from_workflow_parameter(workflow) @outbound.start_child_workflow( Temporalio::Worker::Interceptor::Workflow::StartChildWorkflowInput.new( workflow:, args:, id:, task_queue:, static_summary:, static_details:, cancellation:, cancellation_type:, parent_close_policy:, execution_timeout:, run_timeout:, task_timeout:, id_reuse_policy:, retry_policy:, cron_schedule:, memo:, search_attributes:, priority:, arg_hints: arg_hints || defn_arg_hints, result_hint: result_hint || defn_result_hint, headers: {} ) ) end |
#storage ⇒ Object
323 324 325 |
# File 'lib/temporalio/internal/worker/workflow_instance/context.rb', line 323 def storage @storage ||= {} end |
#timeout(duration, exception_class, *exception_args, summary:) ⇒ Object
327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 |
# File 'lib/temporalio/internal/worker/workflow_instance/context.rb', line 327 def timeout(duration, exception_class, *exception_args, summary:, &) raise 'Block required for timeout' unless block_given? # Run timer in background and block in foreground. This gives better stack traces than a future any-of race. # We make a detached cancellation because we don't want to link to workflow cancellation. sleep_cancel, sleep_cancel_proc = Cancellation.new fiber = Fiber.current Workflow::Future.new do Workflow.sleep(duration, summary:, cancellation: sleep_cancel) fiber.raise(exception_class, *exception_args) if fiber.alive? # steep:ignore rescue Exception => e # rubocop:disable Lint/RescueException # Re-raise in fiber fiber.raise(e) if fiber.alive? end begin yield ensure sleep_cancel_proc.call end end |
#update_handlers ⇒ Object
349 350 351 |
# File 'lib/temporalio/internal/worker/workflow_instance/context.rb', line 349 def update_handlers @instance.update_handlers end |
#upsert_memo(hash) ⇒ Object
353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 |
# File 'lib/temporalio/internal/worker/workflow_instance/context.rb', line 353 def upsert_memo(hash) # Convert to memo, apply updates, then add the command (so command adding is post validation) upserted_memo = ProtoUtils.memo_to_proto(hash, payload_converter) memo._update do |new_hash| hash.each do |key, val| # Nil means delete if val.nil? new_hash.delete(key.to_s) else new_hash[key.to_s] = val end end end @instance.add_command( Bridge::Api::WorkflowCommands::WorkflowCommand.new( modify_workflow_properties: Bridge::Api::WorkflowCommands::ModifyWorkflowProperties.new( upserted_memo: ) ) ) end |
#upsert_search_attributes(*updates) ⇒ Object
375 376 377 378 379 380 381 382 383 384 385 386 387 388 |
# File 'lib/temporalio/internal/worker/workflow_instance/context.rb', line 375 def upsert_search_attributes(*updates) # Apply updates then add the command (so command adding is post validation) search_attributes._disable_mutations = false search_attributes.update!(*updates) @instance.add_command( Bridge::Api::WorkflowCommands::WorkflowCommand.new( upsert_workflow_search_attributes: Bridge::Api::WorkflowCommands::UpsertWorkflowSearchAttributes.new( search_attributes: updates.to_h(&:_to_proto_pair) ) ) ) ensure search_attributes._disable_mutations = true end |
#wait_condition(cancellation:) ⇒ Object
390 391 392 |
# File 'lib/temporalio/internal/worker/workflow_instance/context.rb', line 390 def wait_condition(cancellation:, &) @instance.scheduler.wait_condition(cancellation:, &) end |