Class: Temporalio::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/temporalio/worker.rb,
lib/temporalio/worker/tuner.rb,
lib/temporalio/worker/plugin.rb,
lib/temporalio/worker/interceptor.rb,
lib/temporalio/worker/thread_pool.rb,
lib/temporalio/worker/poller_behavior.rb,
lib/temporalio/worker/activity_executor.rb,
lib/temporalio/worker/workflow_executor.rb,
lib/temporalio/worker/workflow_replayer.rb,
lib/temporalio/worker/deployment_options.rb,
lib/temporalio/worker/activity_executor/fiber.rb,
lib/temporalio/worker/activity_executor/thread_pool.rb,
lib/temporalio/worker/workflow_executor/thread_pool.rb,
lib/temporalio/worker/illegal_workflow_call_validator.rb

Overview

Worker for processing activities and workflows on a task queue.

Workers are created for a task queue and the items they can run. Then #run is used for running a single worker, or Worker.run_all is used for a collection of workers. These can wait until a block is complete or a Cancellation is canceled.

Defined Under Namespace

Modules: Interceptor, Plugin Classes: ActivityExecutor, DeploymentOptions, IllegalWorkflowCallValidator, Options, PollerBehavior, ThreadPool, Tuner, WorkflowExecutor, WorkflowReplayer

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client:, task_queue:, activities: [], workflows: [], tuner: Tuner.create_fixed, activity_executors: ActivityExecutor.defaults, workflow_executor: WorkflowExecutor::ThreadPool.default, plugins: [], interceptors: [], identity: nil, logger: client.options.logger, max_cached_workflows: 1000, max_concurrent_workflow_task_polls: 5, nonsticky_to_sticky_poll_ratio: 0.2, max_concurrent_activity_task_polls: 5, no_remote_activities: false, sticky_queue_schedule_to_start_timeout: 10, max_heartbeat_throttle_interval: 60, default_heartbeat_throttle_interval: 30, max_activities_per_second: nil, max_task_queue_activities_per_second: nil, graceful_shutdown_period: 0, disable_eager_activity_execution: false, illegal_workflow_calls: Worker.default_illegal_workflow_calls, workflow_failure_exception_types: [], workflow_payload_codec_thread_pool: nil, unsafe_workflow_io_enabled: false, deployment_options: Worker.default_deployment_options, workflow_task_poller_behavior: PollerBehavior::SimpleMaximum.new(max_concurrent_workflow_task_polls), activity_task_poller_behavior: PollerBehavior::SimpleMaximum.new(max_concurrent_activity_task_polls), debug_mode: %w[true 1].include?(ENV['TEMPORAL_DEBUG'].to_s.downcase)) ⇒ Worker

Create a new worker. At least one activity or workflow must be present.

Parameters:

  • client (Client)

    Client for this worker.

  • task_queue (String)

    Task queue for this worker.

  • activities (Array<Activity::Definition, Class<Activity::Definition>, Activity::Definition::Info>) (defaults to: [])

    Activities for this worker.

  • workflows (Array<Class<Workflow::Definition>>) (defaults to: [])

    Workflows for this worker.

  • tuner (Tuner) (defaults to: Tuner.create_fixed)

    Tuner that controls the amount of concurrent activities/workflows that run at a time.

  • activity_executors (Hash<Symbol, Worker::ActivityExecutor>) (defaults to: ActivityExecutor.defaults)

    Executors that activities can run within.

  • workflow_executor (WorkflowExecutor) (defaults to: WorkflowExecutor::ThreadPool.default)

    Workflow executor that workflow tasks run within. This must be a Temporalio::Worker::WorkflowExecutor::ThreadPool currently.

  • plugins (Array<Plugin>) (defaults to: [])

    Plugins to use for configuring workers and intercepting the running of workers. Any plugins that were set on the client that include Plugin will automatically be applied to the worker and should not be configured explicitly via this option. WARNING: Plugins are experimental.

  • interceptors (Array<Interceptor::Activity, Interceptor::Workflow>) (defaults to: [])

    Interceptors specific to this worker. Note, interceptors set on the client that include the Temporalio::Worker::Interceptor::Activity or Temporalio::Worker::Interceptor::Workflow module are automatically included here, so no need to specify them again.

  • identity (String, nil) (defaults to: nil)

    Override the identity for this worker. If unset, client identity is used.

  • logger (Logger) (defaults to: client.options.logger)

    Logger to override client logger with. Default is the client logger.

  • max_cached_workflows (Integer) (defaults to: 1000)

    Number of workflows held in cache for use by sticky task queue. If set to 0, workflow caching and sticky queuing are disabled.

  • max_concurrent_workflow_task_polls (Integer) (defaults to: 5)

    Maximum number of concurrent poll workflow task requests we will perform at a time on this worker’s task queue.

  • nonsticky_to_sticky_poll_ratio (Float) (defaults to: 0.2)

    max_concurrent_workflow_task_polls * this number = the number of max pollers that will be allowed for the nonsticky queue when sticky tasks are enabled. If both defaults are used, the sticky queue will allow 4 max pollers while the nonsticky queue will allow one. The minimum for either poller is 1, so if max_concurrent_workflow_task_polls is 1 and sticky queues are enabled, there will be 2 concurrent polls.

  • max_concurrent_activity_task_polls (Integer) (defaults to: 5)

    Maximum number of concurrent poll activity task requests we will perform at a time on this worker’s task queue.

  • no_remote_activities (Boolean) (defaults to: false)

    If true, this worker will only handle workflow tasks and local activities, it will not poll for activity tasks.

  • sticky_queue_schedule_to_start_timeout (Float) (defaults to: 10)

    How long a workflow task is allowed to sit on the sticky queue before it is timed out and moved to the non-sticky queue where it may be picked up by any worker.

  • max_heartbeat_throttle_interval (Float) (defaults to: 60)

    Longest interval for throttling activity heartbeats.

  • default_heartbeat_throttle_interval (Float) (defaults to: 30)

    Default interval for throttling activity heartbeats in case per-activity heartbeat timeout is unset. Otherwise, it’s the per-activity heartbeat timeout * 0.8.

  • max_activities_per_second (Float, nil) (defaults to: nil)

    Limits the number of activities per second that this worker will process. The worker will not poll for new activities if by doing so it might receive and execute an activity which would cause it to exceed this limit.

  • max_task_queue_activities_per_second (Float, nil) (defaults to: nil)

    Sets the maximum number of activities per second the task queue will dispatch, controlled server-side. Note that this only takes effect upon an activity poll request. If multiple workers on the same queue have different values set, they will thrash with the last poller winning.

  • graceful_shutdown_period (Float) (defaults to: 0)

    Amount of time after shutdown is called that activities are given to complete before their tasks are canceled.

  • disable_eager_activity_execution (Boolean) (defaults to: false)

    If true, disables eager activity execution. Eager activity execution is an optimization on some servers that sends activities back to the same worker as the calling workflow if they can run there. This should be set to true for max_task_queue_activities_per_second to work and in a future version of this API may be implied as such (i.e. this setting will be ignored if that setting is set).

  • illegal_workflow_calls (Hash<String, [:all, Array<Symbol, IllegalWorkflowCallValidator>, IllegalWorkflowCallValidator]>) (defaults to: Worker.default_illegal_workflow_calls)

    Set of illegal workflow calls that are considered unsafe/non-deterministic and will raise if seen. The key of the hash is the fully qualified string class name (no leading ::). The value can be :all which means any use of the class is illegal. The value can be an array of symbols/validators for methods on the class that cannot be used. The methods refer to either instance or class methods, there is no way to differentiate at this time. Symbol method names are the normal way to say the method cannot be used, validators are only for advanced situations. Finally, for advanced situations, the hash value can be a class-level validator that is not tied to a specific method.

  • workflow_failure_exception_types (Array<Class<Exception>>) (defaults to: [])

    Workflow failure exception types. This is the set of exception types that, if a workflow-thrown exception extends, will cause the workflow/update to fail instead of suspending the workflow via task failure. These are applied in addition to the workflow_failure_exception_type on the workflow definition class itself. If Exception is set, it effectively will fail a workflow/update in all user exception cases.

  • workflow_payload_codec_thread_pool (ThreadPool, nil) (defaults to: nil)

    Thread pool to run payload codec encode/decode within. This is required if a payload codec exists and the worker is not fiber based. Codecs can potentially block execution which is why they need to be run in the background.

  • unsafe_workflow_io_enabled (Boolean) (defaults to: false)

    If false, the default, workflow code that invokes io_wait on the fiber scheduler will fail. Instead of setting this to true, users are encouraged to use Temporalio::Workflow::Unsafe.io_enabled with a block for narrower enabling of IO.

  • deployment_options (DeploymentOptions, nil) (defaults to: Worker.default_deployment_options)

    Deployment options for the worker. WARNING: This is an experimental feature and may change in the future.

  • workflow_task_poller_behavior (PollerBehavior) (defaults to: PollerBehavior::SimpleMaximum.new(max_concurrent_workflow_task_polls))

    Specify the behavior of workflow task polling. Defaults to a 5-poller maximum.

  • activity_task_poller_behavior (PollerBehavior) (defaults to: PollerBehavior::SimpleMaximum.new(max_concurrent_activity_task_polls))

    Specify the behavior of activity task polling. Defaults to a 5-poller maximum.

  • debug_mode (Boolean) (defaults to: %w[true 1].include?(ENV['TEMPORAL_DEBUG'].to_s.downcase))

    If true, deadlock detection is disabled. Deadlock detection will fail workflow tasks if they block the thread for too long. This defaults to true if the TEMPORAL_DEBUG environment variable is true or 1.



436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
# File 'lib/temporalio/worker.rb', line 436

def initialize(
  client:,
  task_queue:,
  activities: [],
  workflows: [],
  tuner: Tuner.create_fixed,
  activity_executors: ActivityExecutor.defaults,
  workflow_executor: WorkflowExecutor::ThreadPool.default,
  plugins: [],
  interceptors: [],
  identity: nil,
  logger: client.options.logger,
  max_cached_workflows: 1000,
  max_concurrent_workflow_task_polls: 5,
  nonsticky_to_sticky_poll_ratio: 0.2,
  max_concurrent_activity_task_polls: 5,
  no_remote_activities: false,
  sticky_queue_schedule_to_start_timeout: 10,
  max_heartbeat_throttle_interval: 60,
  default_heartbeat_throttle_interval: 30,
  max_activities_per_second: nil,
  max_task_queue_activities_per_second: nil,
  graceful_shutdown_period: 0,
  disable_eager_activity_execution: false,
  illegal_workflow_calls: Worker.default_illegal_workflow_calls,
  workflow_failure_exception_types: [],
  workflow_payload_codec_thread_pool: nil,
  unsafe_workflow_io_enabled: false,
  deployment_options: Worker.default_deployment_options,
  workflow_task_poller_behavior: PollerBehavior::SimpleMaximum.new(max_concurrent_workflow_task_polls),
  activity_task_poller_behavior: PollerBehavior::SimpleMaximum.new(max_concurrent_activity_task_polls),
  debug_mode: %w[true 1].include?(ENV['TEMPORAL_DEBUG'].to_s.downcase)
)
  Internal::ProtoUtils.assert_non_reserved_name(task_queue)

  @options = Options.new(
    client:,
    task_queue:,
    activities:,
    workflows:,
    tuner:,
    activity_executors:,
    workflow_executor:,
    plugins:,
    interceptors:,
    identity:,
    logger:,
    max_cached_workflows:,
    max_concurrent_workflow_task_polls:,
    nonsticky_to_sticky_poll_ratio:,
    max_concurrent_activity_task_polls:,
    no_remote_activities:,
    sticky_queue_schedule_to_start_timeout:,
    max_heartbeat_throttle_interval:,
    default_heartbeat_throttle_interval:,
    max_activities_per_second:,
    max_task_queue_activities_per_second:,
    graceful_shutdown_period:,
    disable_eager_activity_execution:,
    illegal_workflow_calls:,
    workflow_failure_exception_types:,
    workflow_payload_codec_thread_pool:,
    unsafe_workflow_io_enabled:,
    deployment_options:,
    workflow_task_poller_behavior:,
    activity_task_poller_behavior:,
    debug_mode:
  ).freeze
  # Collect applicable client plugins and worker plugins, then validate and apply to options
  @plugins = client.options.plugins.grep(Plugin) + plugins
  Worker._validate_plugins!(@plugins)
  @options = @plugins.reduce(@options) { |options, plugin| plugin.configure_worker(options) }
  # Initialize the worker for the given options
  _initialize_from_options
end

Instance Attribute Details

#optionsOptions (readonly)

Returns Options for this worker which has the same attributes as #initialize.

Returns:



356
357
358
# File 'lib/temporalio/worker.rb', line 356

def options
  @options
end

Class Method Details

.default_build_idString

Returns Memoized default build ID. This default value is built as a checksum of all of the loaded Ruby source files in ‘$LOADED_FEATURES`. Users may prefer to set the build ID to a better representation of the source.

Returns:

  • (String)

    Memoized default build ID. This default value is built as a checksum of all of the loaded Ruby source files in ‘$LOADED_FEATURES`. Users may prefer to set the build ID to a better representation of the source.



73
74
75
# File 'lib/temporalio/worker.rb', line 73

def self.default_build_id
  @default_build_id ||= _load_default_build_id
end

.default_deployment_optionsDeploymentOptions

Returns Default deployment options, which does not use worker versioning or a deployment name, and sets the build id to the one from selfself.default_build_id.

Returns:

  • (DeploymentOptions)

    Default deployment options, which does not use worker versioning or a deployment name, and sets the build id to the one from selfself.default_build_id.



100
101
102
103
104
# File 'lib/temporalio/worker.rb', line 100

def self.default_deployment_options
  @default_deployment_options ||= DeploymentOptions.new(
    version: WorkerDeploymentVersion.new(deployment_name: '', build_id: Worker.default_build_id)
  )
end

.default_illegal_workflow_callsHash<String, [:all, Array<Symbol, IllegalWorkflowCallValidator>, IllegalWorkflowCallValidator]>

Returns Default, immutable set illegal calls used for the illegal_workflow_calls worker option. See the documentation of that option for more details.

Returns:



288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
# File 'lib/temporalio/worker.rb', line 288

def self.default_illegal_workflow_calls
  @default_illegal_workflow_calls ||= begin
    hash = {
      'BasicSocket' => :all,
      'Date' => %i[initialize today],
      'DateTime' => %i[initialize now],
      'Dir' => :all,
      'Fiber' => [:set_scheduler],
      'File' => :all,
      'FileTest' => :all,
      'FileUtils' => :all,
      'Find' => :all,
      'GC' => :all,
      'IO' => [
        :read
        # Intentionally leaving out write so puts will work. We don't want to add heavy logic replacing stdout or
        # trying to derive whether it's file vs stdout write.
        #:write
      ],
      'Kernel' => %i[abort at_exit autoload autoload? eval exec exit fork gets load open rand readline readlines
                     sleep spawn srand system test trap],
      # Loggers use mutexes in ways that can hang workflows, so users need to disable the durable scheduler to use
      # them
      'Logger' => :all,
      'Monitor' => :all,
      'Net::HTTP' => :all,
      'Pathname' => :all,
      # TODO(cretz): Investigate why clock_gettime called from Timeout thread affects this code at all. Stack trace
      # test executing activities inside a timeout will fail if clock_gettime is blocked.
      'Process' => %i[abort argv0 daemon detach exec exit exit! fork kill setpriority setproctitle setrlimit setsid
                      spawn times wait wait2 waitall warmup],
      # TODO(cretz): Allow Ractor.current since exception formatting in error_highlight references it
      # 'Ractor' => :all,
      'Random::Base' => [:initialize],
      'Resolv' => :all,
      'SecureRandom' => :all,
      'Signal' => :all,
      'Socket' => :all,
      'Tempfile' => :all,
      'Timeout' => :all,
      'Thread' => %i[abort_on_exception= exit fork handle_interrupt ignore_deadlock= kill new pass
                     pending_interrupt? report_on_exception= start stop initialize join name= priority= raise run
                     terminate thread_variable_set wakeup],
      'Thread::ConditionVariable' => :all,
      'Thread::Mutex' => IllegalWorkflowCallValidator.known_safe_mutex_validator,
      'Thread::SizedQueue' => :all,
      'Thread::Queue' => :all,
      'Time' => IllegalWorkflowCallValidator.default_time_validators
    } #: Hash[String, :all | Array[Symbol]]
    hash.each_value(&:freeze)
    hash.freeze
  end
end

.run_all(*workers, cancellation: Cancellation.new, shutdown_signals: [], raise_in_block_on_shutdown: Error::CanceledError.new('Workers finished'), wait_block_complete: true) { ... } ⇒ Object

Run all workers until cancellation or optional block completes. When the cancellation or block is complete, the workers are shut down. This will return the block result if everything successful or raise an error if not. See #run for details on how worker shutdown works.

Parameters:

  • workers (Array<Worker>)

    Workers to run.

  • cancellation (Cancellation) (defaults to: Cancellation.new)

    Cancellation that can be canceled to shut down all workers.

  • shutdown_signals (Array) (defaults to: [])

    Signals to trap and cause worker shutdown.

  • raise_in_block_on_shutdown (Exception, nil) (defaults to: Error::CanceledError.new('Workers finished'))

    Exception to Thread.raise or Fiber.raise if a block is present and still running on shutdown. If nil, raise is not used.

  • wait_block_complete (Boolean) (defaults to: true)

    If block given and shutdown caused by something else (e.g. cancellation canceled), whether to wait on the block to complete before returning.

Yields:

  • Optional block. This will be run in a new background thread or fiber. Workers will shut down upon completion of this and, assuming no other failures, return/bubble success/exception of the block.

Returns:

  • (Object)

    Return value of the block or nil of no block given.



120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/temporalio/worker.rb', line 120

def self.run_all(
  *workers,
  cancellation: Cancellation.new,
  shutdown_signals: [],
  raise_in_block_on_shutdown: Error::CanceledError.new('Workers finished'),
  wait_block_complete: true,
  &block
)
  # We have to apply plugins. However, every plugin has a different worker. So we provide them the worker along with
  # other options, but we disregard any mutation of the worker on the next call.
  run_worker = proc do |options|
    # @type var options: Plugin::RunWorkerOptions
    _run_all_root(*workers,
                  cancellation: options.cancellation,
                  shutdown_signals: options.shutdown_signals,
                  raise_in_block_on_shutdown: options.raise_in_block_on_shutdown,
                  wait_block_complete:, &block)
  end
  plugins_with_workers = workers.flat_map { |w| w._plugins.map { |p| [p, w] } }
  run_worker = plugins_with_workers.reverse_each.reduce(run_worker) do |next_call, plugin_with_worker|
    plugin, worker = plugin_with_worker
    proc do |options|
      plugin.run_worker(options.with(worker:), next_call) # steep:ignore
    end
  end

  run_worker.call(Plugin::RunWorkerOptions.new(
    # Intentionally violating typing here because we set this on each call
    worker: nil, # steep:ignore
    cancellation:,
    shutdown_signals:,
    raise_in_block_on_shutdown:
  ))
end

Instance Method Details

#clientClient

Returns Client for this worker. This is the same as Temporalio::Worker::Options#client in #options, but surrounded by a mutex to be safe for client replacement in #client=.

Returns:



612
613
614
# File 'lib/temporalio/worker.rb', line 612

def client
  @client_mutex.synchronize { @options.client }
end

#client=(new_client) ⇒ Object

Replace the worker’s client. When this is called, the client is replaced on the internal worker which means any new calls will be made on the new client (but existing calls will still complete on the previous one). This is commonly used for providing a new client with updated authentication credentials.

Parameters:

  • new_client (Client)

    New client to use for new calls.



621
622
623
624
625
626
627
# File 'lib/temporalio/worker.rb', line 621

def client=(new_client)
  @client_mutex.synchronize do
    @bridge_worker.replace_client(new_client.connection._core_client)
    @options = @options.with(client: new_client)
    new_client
  end
end

#run(cancellation: Cancellation.new, shutdown_signals: [], raise_in_block_on_shutdown: Error::CanceledError.new('Workers finished'), wait_block_complete: true) { ... } ⇒ Object

Run this worker until cancellation or optional block completes. When the cancellation or block is complete, the worker is shut down. This will return the block result if everything successful or raise an error if not.

Upon shutdown (either via cancellation, block completion, or worker fatal error), the worker immediately stops accepting new work. Then, after an optional grace period, all activities are canceled. This call then waits for every activity and workflow task to complete before returning.

Parameters:

  • cancellation (Cancellation) (defaults to: Cancellation.new)

    Cancellation that can be canceled to shut down this worker.

  • shutdown_signals (Array) (defaults to: [])

    Signals to trap and cause worker shutdown.

  • raise_in_block_on_shutdown (Exception, nil) (defaults to: Error::CanceledError.new('Workers finished'))

    Exception to Thread.raise or Fiber.raise if a block is present and still running on shutdown. If nil, raise is not used.

  • wait_block_complete (Boolean) (defaults to: true)

    If block given and shutdown caused by something else (e.g. cancellation canceled), whether to wait on the block to complete before returning.

Yields:

  • Optional block. This will be run in a new background thread or fiber. Worker will shut down upon completion of this and, assuming no other failures, return/bubble success/exception of the block.

Returns:

  • (Object)

    Return value of the block or nil of no block given.



645
646
647
648
649
650
651
652
653
# File 'lib/temporalio/worker.rb', line 645

def run(
  cancellation: Cancellation.new,
  shutdown_signals: [],
  raise_in_block_on_shutdown: Error::CanceledError.new('Workers finished'),
  wait_block_complete: true,
  &block
)
  Worker.run_all(self, cancellation:, shutdown_signals:, raise_in_block_on_shutdown:, wait_block_complete:, &block)
end

#task_queueString

Returns Task queue set on the worker options.

Returns:

  • (String)

    Task queue set on the worker options.



606
607
608
# File 'lib/temporalio/worker.rb', line 606

def task_queue
  @options.task_queue
end