Class: Temporalio::Worker
- Inherits:
-
Object
- Object
- Temporalio::Worker
- Defined in:
- lib/temporalio/worker.rb,
lib/temporalio/worker/tuner.rb,
lib/temporalio/worker/plugin.rb,
lib/temporalio/worker/interceptor.rb,
lib/temporalio/worker/thread_pool.rb,
lib/temporalio/worker/poller_behavior.rb,
lib/temporalio/worker/activity_executor.rb,
lib/temporalio/worker/workflow_executor.rb,
lib/temporalio/worker/workflow_replayer.rb,
lib/temporalio/worker/deployment_options.rb,
lib/temporalio/worker/activity_executor/fiber.rb,
lib/temporalio/worker/activity_executor/thread_pool.rb,
lib/temporalio/worker/workflow_executor/thread_pool.rb,
lib/temporalio/worker/illegal_workflow_call_validator.rb
Overview
Worker for processing activities and workflows on a task queue.
Workers are created for a task queue and the items they can run. Then #run is used for running a single worker, or Worker.run_all is used for a collection of workers. These can wait until a block is complete or a Cancellation is canceled.
Defined Under Namespace
Modules: Interceptor, Plugin Classes: ActivityExecutor, DeploymentOptions, IllegalWorkflowCallValidator, Options, PollerBehavior, ThreadPool, Tuner, WorkflowExecutor, WorkflowReplayer
Instance Attribute Summary collapse
-
#options ⇒ Options
readonly
Options for this worker which has the same attributes as #initialize.
Class Method Summary collapse
-
.default_build_id ⇒ String
Memoized default build ID.
-
.default_deployment_options ⇒ DeploymentOptions
Default deployment options, which does not use worker versioning or a deployment name, and sets the build id to the one from selfself.default_build_id.
-
.default_illegal_workflow_calls ⇒ Hash<String, [:all, Array<Symbol, IllegalWorkflowCallValidator>, IllegalWorkflowCallValidator]>
Default, immutable set illegal calls used for the
illegal_workflow_callsworker option. -
.run_all(*workers, cancellation: Cancellation.new, shutdown_signals: [], raise_in_block_on_shutdown: Error::CanceledError.new('Workers finished'), wait_block_complete: true) { ... } ⇒ Object
Run all workers until cancellation or optional block completes.
Instance Method Summary collapse
-
#client ⇒ Client
Client for this worker.
-
#client=(new_client) ⇒ Object
Replace the worker’s client.
-
#initialize(client:, task_queue:, activities: [], workflows: [], tuner: Tuner.create_fixed, activity_executors: ActivityExecutor.defaults, workflow_executor: WorkflowExecutor::ThreadPool.default, plugins: [], interceptors: [], identity: nil, logger: client.options.logger, max_cached_workflows: 1000, max_concurrent_workflow_task_polls: 5, nonsticky_to_sticky_poll_ratio: 0.2, max_concurrent_activity_task_polls: 5, no_remote_activities: false, sticky_queue_schedule_to_start_timeout: 10, max_heartbeat_throttle_interval: 60, default_heartbeat_throttle_interval: 30, max_activities_per_second: nil, max_task_queue_activities_per_second: nil, graceful_shutdown_period: 0, disable_eager_activity_execution: false, illegal_workflow_calls: Worker.default_illegal_workflow_calls, workflow_failure_exception_types: [], workflow_payload_codec_thread_pool: nil, unsafe_workflow_io_enabled: false, deployment_options: Worker.default_deployment_options, workflow_task_poller_behavior: PollerBehavior::SimpleMaximum.new(max_concurrent_workflow_task_polls), activity_task_poller_behavior: PollerBehavior::SimpleMaximum.new(max_concurrent_activity_task_polls), debug_mode: %w[true 1].include?(ENV['TEMPORAL_DEBUG'].to_s.downcase)) ⇒ Worker
constructor
Create a new worker.
-
#run(cancellation: Cancellation.new, shutdown_signals: [], raise_in_block_on_shutdown: Error::CanceledError.new('Workers finished'), wait_block_complete: true) { ... } ⇒ Object
Run this worker until cancellation or optional block completes.
-
#task_queue ⇒ String
Task queue set on the worker options.
Constructor Details
#initialize(client:, task_queue:, activities: [], workflows: [], tuner: Tuner.create_fixed, activity_executors: ActivityExecutor.defaults, workflow_executor: WorkflowExecutor::ThreadPool.default, plugins: [], interceptors: [], identity: nil, logger: client.options.logger, max_cached_workflows: 1000, max_concurrent_workflow_task_polls: 5, nonsticky_to_sticky_poll_ratio: 0.2, max_concurrent_activity_task_polls: 5, no_remote_activities: false, sticky_queue_schedule_to_start_timeout: 10, max_heartbeat_throttle_interval: 60, default_heartbeat_throttle_interval: 30, max_activities_per_second: nil, max_task_queue_activities_per_second: nil, graceful_shutdown_period: 0, disable_eager_activity_execution: false, illegal_workflow_calls: Worker.default_illegal_workflow_calls, workflow_failure_exception_types: [], workflow_payload_codec_thread_pool: nil, unsafe_workflow_io_enabled: false, deployment_options: Worker.default_deployment_options, workflow_task_poller_behavior: PollerBehavior::SimpleMaximum.new(max_concurrent_workflow_task_polls), activity_task_poller_behavior: PollerBehavior::SimpleMaximum.new(max_concurrent_activity_task_polls), debug_mode: %w[true 1].include?(ENV['TEMPORAL_DEBUG'].to_s.downcase)) ⇒ Worker
Create a new worker. At least one activity or workflow must be present.
436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 |
# File 'lib/temporalio/worker.rb', line 436 def initialize( client:, task_queue:, activities: [], workflows: [], tuner: Tuner.create_fixed, activity_executors: ActivityExecutor.defaults, workflow_executor: WorkflowExecutor::ThreadPool.default, plugins: [], interceptors: [], identity: nil, logger: client..logger, max_cached_workflows: 1000, max_concurrent_workflow_task_polls: 5, nonsticky_to_sticky_poll_ratio: 0.2, max_concurrent_activity_task_polls: 5, no_remote_activities: false, sticky_queue_schedule_to_start_timeout: 10, max_heartbeat_throttle_interval: 60, default_heartbeat_throttle_interval: 30, max_activities_per_second: nil, max_task_queue_activities_per_second: nil, graceful_shutdown_period: 0, disable_eager_activity_execution: false, illegal_workflow_calls: Worker.default_illegal_workflow_calls, workflow_failure_exception_types: [], workflow_payload_codec_thread_pool: nil, unsafe_workflow_io_enabled: false, deployment_options: Worker., workflow_task_poller_behavior: PollerBehavior::SimpleMaximum.new(max_concurrent_workflow_task_polls), activity_task_poller_behavior: PollerBehavior::SimpleMaximum.new(max_concurrent_activity_task_polls), debug_mode: %w[true 1].include?(ENV['TEMPORAL_DEBUG'].to_s.downcase) ) Internal::ProtoUtils.assert_non_reserved_name(task_queue) @options = Options.new( client:, task_queue:, activities:, workflows:, tuner:, activity_executors:, workflow_executor:, plugins:, interceptors:, identity:, logger:, max_cached_workflows:, max_concurrent_workflow_task_polls:, nonsticky_to_sticky_poll_ratio:, max_concurrent_activity_task_polls:, no_remote_activities:, sticky_queue_schedule_to_start_timeout:, max_heartbeat_throttle_interval:, default_heartbeat_throttle_interval:, max_activities_per_second:, max_task_queue_activities_per_second:, graceful_shutdown_period:, disable_eager_activity_execution:, illegal_workflow_calls:, workflow_failure_exception_types:, workflow_payload_codec_thread_pool:, unsafe_workflow_io_enabled:, deployment_options:, workflow_task_poller_behavior:, activity_task_poller_behavior:, debug_mode: ).freeze # Collect applicable client plugins and worker plugins, then validate and apply to options @plugins = client..plugins.grep(Plugin) + plugins Worker._validate_plugins!(@plugins) @options = @plugins.reduce(@options) { |, plugin| plugin.configure_worker() } # Initialize the worker for the given options end |
Instance Attribute Details
#options ⇒ Options (readonly)
Returns Options for this worker which has the same attributes as #initialize.
356 357 358 |
# File 'lib/temporalio/worker.rb', line 356 def @options end |
Class Method Details
.default_build_id ⇒ String
Returns Memoized default build ID. This default value is built as a checksum of all of the loaded Ruby source files in ‘$LOADED_FEATURES`. Users may prefer to set the build ID to a better representation of the source.
73 74 75 |
# File 'lib/temporalio/worker.rb', line 73 def self.default_build_id @default_build_id ||= _load_default_build_id end |
.default_deployment_options ⇒ DeploymentOptions
Returns Default deployment options, which does not use worker versioning or a deployment name, and sets the build id to the one from selfself.default_build_id.
100 101 102 103 104 |
# File 'lib/temporalio/worker.rb', line 100 def self. @default_deployment_options ||= DeploymentOptions.new( version: WorkerDeploymentVersion.new(deployment_name: '', build_id: Worker.default_build_id) ) end |
.default_illegal_workflow_calls ⇒ Hash<String, [:all, Array<Symbol, IllegalWorkflowCallValidator>, IllegalWorkflowCallValidator]>
Returns Default, immutable set illegal calls used for the illegal_workflow_calls worker option. See the documentation of that option for more details.
288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 |
# File 'lib/temporalio/worker.rb', line 288 def self.default_illegal_workflow_calls @default_illegal_workflow_calls ||= begin hash = { 'BasicSocket' => :all, 'Date' => %i[initialize today], 'DateTime' => %i[initialize now], 'Dir' => :all, 'Fiber' => [:set_scheduler], 'File' => :all, 'FileTest' => :all, 'FileUtils' => :all, 'Find' => :all, 'GC' => :all, 'IO' => [ :read # Intentionally leaving out write so puts will work. We don't want to add heavy logic replacing stdout or # trying to derive whether it's file vs stdout write. #:write ], 'Kernel' => %i[abort at_exit autoload autoload? eval exec exit fork gets load open rand readline readlines sleep spawn srand system test trap], # Loggers use mutexes in ways that can hang workflows, so users need to disable the durable scheduler to use # them 'Logger' => :all, 'Monitor' => :all, 'Net::HTTP' => :all, 'Pathname' => :all, # TODO(cretz): Investigate why clock_gettime called from Timeout thread affects this code at all. Stack trace # test executing activities inside a timeout will fail if clock_gettime is blocked. 'Process' => %i[abort argv0 daemon detach exec exit exit! fork kill setpriority setproctitle setrlimit setsid spawn times wait wait2 waitall warmup], # TODO(cretz): Allow Ractor.current since exception formatting in error_highlight references it # 'Ractor' => :all, 'Random::Base' => [:initialize], 'Resolv' => :all, 'SecureRandom' => :all, 'Signal' => :all, 'Socket' => :all, 'Tempfile' => :all, 'Timeout' => :all, 'Thread' => %i[abort_on_exception= exit fork handle_interrupt ignore_deadlock= kill new pass pending_interrupt? report_on_exception= start stop initialize join name= priority= raise run terminate thread_variable_set wakeup], 'Thread::ConditionVariable' => :all, 'Thread::Mutex' => IllegalWorkflowCallValidator.known_safe_mutex_validator, 'Thread::SizedQueue' => :all, 'Thread::Queue' => :all, 'Time' => IllegalWorkflowCallValidator.default_time_validators } #: Hash[String, :all | Array[Symbol]] hash.each_value(&:freeze) hash.freeze end end |
.run_all(*workers, cancellation: Cancellation.new, shutdown_signals: [], raise_in_block_on_shutdown: Error::CanceledError.new('Workers finished'), wait_block_complete: true) { ... } ⇒ Object
Run all workers until cancellation or optional block completes. When the cancellation or block is complete, the workers are shut down. This will return the block result if everything successful or raise an error if not. See #run for details on how worker shutdown works.
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/temporalio/worker.rb', line 120 def self.run_all( *workers, cancellation: Cancellation.new, shutdown_signals: [], raise_in_block_on_shutdown: Error::CanceledError.new('Workers finished'), wait_block_complete: true, &block ) # We have to apply plugins. However, every plugin has a different worker. So we provide them the worker along with # other options, but we disregard any mutation of the worker on the next call. run_worker = proc do || # @type var options: Plugin::RunWorkerOptions _run_all_root(*workers, cancellation: .cancellation, shutdown_signals: .shutdown_signals, raise_in_block_on_shutdown: .raise_in_block_on_shutdown, wait_block_complete:, &block) end plugins_with_workers = workers.flat_map { |w| w._plugins.map { |p| [p, w] } } run_worker = plugins_with_workers.reverse_each.reduce(run_worker) do |next_call, plugin_with_worker| plugin, worker = plugin_with_worker proc do || plugin.run_worker(.with(worker:), next_call) # steep:ignore end end run_worker.call(Plugin::RunWorkerOptions.new( # Intentionally violating typing here because we set this on each call worker: nil, # steep:ignore cancellation:, shutdown_signals:, raise_in_block_on_shutdown: )) end |
Instance Method Details
#client ⇒ Client
Returns Client for this worker. This is the same as Temporalio::Worker::Options#client in #options, but surrounded by a mutex to be safe for client replacement in #client=.
612 613 614 |
# File 'lib/temporalio/worker.rb', line 612 def client @client_mutex.synchronize { @options.client } end |
#client=(new_client) ⇒ Object
Replace the worker’s client. When this is called, the client is replaced on the internal worker which means any new calls will be made on the new client (but existing calls will still complete on the previous one). This is commonly used for providing a new client with updated authentication credentials.
621 622 623 624 625 626 627 |
# File 'lib/temporalio/worker.rb', line 621 def client=(new_client) @client_mutex.synchronize do @bridge_worker.replace_client(new_client.connection._core_client) @options = @options.with(client: new_client) new_client end end |
#run(cancellation: Cancellation.new, shutdown_signals: [], raise_in_block_on_shutdown: Error::CanceledError.new('Workers finished'), wait_block_complete: true) { ... } ⇒ Object
Run this worker until cancellation or optional block completes. When the cancellation or block is complete, the worker is shut down. This will return the block result if everything successful or raise an error if not.
Upon shutdown (either via cancellation, block completion, or worker fatal error), the worker immediately stops accepting new work. Then, after an optional grace period, all activities are canceled. This call then waits for every activity and workflow task to complete before returning.
645 646 647 648 649 650 651 652 653 |
# File 'lib/temporalio/worker.rb', line 645 def run( cancellation: Cancellation.new, shutdown_signals: [], raise_in_block_on_shutdown: Error::CanceledError.new('Workers finished'), wait_block_complete: true, &block ) Worker.run_all(self, cancellation:, shutdown_signals:, raise_in_block_on_shutdown:, wait_block_complete:, &block) end |
#task_queue ⇒ String
Returns Task queue set on the worker options.
606 607 608 |
# File 'lib/temporalio/worker.rb', line 606 def task_queue @options.task_queue end |