Method: Temporalio::Worker#initialize

Defined in:
lib/temporalio/worker.rb

#initialize(client:, task_queue:, activities: [], workflows: [], tuner: Tuner.create_fixed, activity_executors: ActivityExecutor.defaults, workflow_executor: WorkflowExecutor::ThreadPool.default, interceptors: [], identity: nil, logger: client.options.logger, max_cached_workflows: 1000, max_concurrent_workflow_task_polls: 5, nonsticky_to_sticky_poll_ratio: 0.2, max_concurrent_activity_task_polls: 5, no_remote_activities: false, sticky_queue_schedule_to_start_timeout: 10, max_heartbeat_throttle_interval: 60, default_heartbeat_throttle_interval: 30, max_activities_per_second: nil, max_task_queue_activities_per_second: nil, graceful_shutdown_period: 0, disable_eager_activity_execution: false, illegal_workflow_calls: Worker.default_illegal_workflow_calls, workflow_failure_exception_types: [], workflow_payload_codec_thread_pool: nil, unsafe_workflow_io_enabled: false, deployment_options: Worker.default_deployment_options, workflow_task_poller_behavior: PollerBehavior::SimpleMaximum.new(max_concurrent_workflow_task_polls), activity_task_poller_behavior: PollerBehavior::SimpleMaximum.new(max_concurrent_activity_task_polls), debug_mode: %w[true 1].include?(ENV['TEMPORAL_DEBUG'].to_s.downcase)) ⇒ Worker

Create a new worker. At least one activity or workflow must be present.

Parameters:

  • client (Client)

    Client for this worker.

  • task_queue (String)

    Task queue for this worker.

  • activities (Array<Activity::Definition, Class<Activity::Definition>, Activity::Definition::Info>) (defaults to: [])

    Activities for this worker.

  • workflows (Array<Class<Workflow::Definition>>) (defaults to: [])

    Workflows for this worker.

  • tuner (Tuner) (defaults to: Tuner.create_fixed)

    Tuner that controls the amount of concurrent activities/workflows that run at a time.

  • activity_executors (Hash<Symbol, Worker::ActivityExecutor>) (defaults to: ActivityExecutor.defaults)

    Executors that activities can run within.

  • workflow_executor (WorkflowExecutor) (defaults to: WorkflowExecutor::ThreadPool.default)

    Workflow executor that workflow tasks run within. This must be a Temporalio::Worker::WorkflowExecutor::ThreadPool currently.

  • interceptors (Array<Interceptor::Activity, Interceptor::Workflow>) (defaults to: [])

    Interceptors specific to this worker. Note, interceptors set on the client that include the Temporalio::Worker::Interceptor::Activity or Temporalio::Worker::Interceptor::Workflow module are automatically included here, so no need to specify them again.

  • identity (String, nil) (defaults to: nil)

    Override the identity for this worker. If unset, client identity is used.

  • logger (Logger) (defaults to: client.options.logger)

    Logger to override client logger with. Default is the client logger.

  • max_cached_workflows (Integer) (defaults to: 1000)

    Number of workflows held in cache for use by sticky task queue. If set to 0, workflow caching and sticky queuing are disabled.

  • max_concurrent_workflow_task_polls (Integer) (defaults to: 5)

    Maximum number of concurrent poll workflow task requests we will perform at a time on this worker’s task queue.

  • nonsticky_to_sticky_poll_ratio (Float) (defaults to: 0.2)

    ‘max_concurrent_workflow_task_polls` * this number = the number of max pollers that will be allowed for the nonsticky queue when sticky tasks are enabled. If both defaults are used, the sticky queue will allow 4 max pollers while the nonsticky queue will allow one. The minimum for either poller is 1, so if `max_concurrent_workflow_task_polls` is 1 and sticky queues are enabled, there will be 2 concurrent polls.

  • max_concurrent_activity_task_polls (Integer) (defaults to: 5)

    Maximum number of concurrent poll activity task requests we will perform at a time on this worker’s task queue.

  • no_remote_activities (Boolean) (defaults to: false)

    If true, this worker will only handle workflow tasks and local activities, it will not poll for activity tasks.

  • sticky_queue_schedule_to_start_timeout (Float) (defaults to: 10)

    How long a workflow task is allowed to sit on the sticky queue before it is timed out and moved to the non-sticky queue where it may be picked up by any worker.

  • max_heartbeat_throttle_interval (Float) (defaults to: 60)

    Longest interval for throttling activity heartbeats.

  • default_heartbeat_throttle_interval (Float) (defaults to: 30)

    Default interval for throttling activity heartbeats in case per-activity heartbeat timeout is unset. Otherwise, it’s the per-activity heartbeat timeout * 0.8.

  • max_activities_per_second (Float, nil) (defaults to: nil)

    Limits the number of activities per second that this worker will process. The worker will not poll for new activities if by doing so it might receive and execute an activity which would cause it to exceed this limit.

  • max_task_queue_activities_per_second (Float, nil) (defaults to: nil)

    Sets the maximum number of activities per second the task queue will dispatch, controlled server-side. Note that this only takes effect upon an activity poll request. If multiple workers on the same queue have different values set, they will thrash with the last poller winning.

  • graceful_shutdown_period (Float) (defaults to: 0)

    Amount of time after shutdown is called that activities are given to complete before their tasks are canceled.

  • disable_eager_activity_execution (Boolean) (defaults to: false)

    If true, disables eager activity execution. Eager activity execution is an optimization on some servers that sends activities back to the same worker as the calling workflow if they can run there. This should be set to true for ‘max_task_queue_activities_per_second` to work and in a future version of this API may be implied as such (i.e. this setting will be ignored if that setting is set).

  • illegal_workflow_calls (Hash<String, [:all, Array<Symbol, IllegalWorkflowCallValidator>, IllegalWorkflowCallValidator]>) (defaults to: Worker.default_illegal_workflow_calls)

    Set of illegal workflow calls that are considered unsafe/non-deterministic and will raise if seen. The key of the hash is the fully qualified string class name (no leading ‘::`). The value can be `:all` which means any use of the class is illegal. The value can be an array of symbols/validators for methods on the class that cannot be used. The methods refer to either instance or class methods, there is no way to differentiate at this time. Symbol method names are the normal way to say the method cannot be used, validators are only for advanced situations. Finally, for advanced situations, the hash value can be a class-level validator that is not tied to a specific method.

  • workflow_failure_exception_types (Array<Class<Exception>>) (defaults to: [])

    Workflow failure exception types. This is the set of exception types that, if a workflow-thrown exception extends, will cause the workflow/update to fail instead of suspending the workflow via task failure. These are applied in addition to the ‘workflow_failure_exception_type` on the workflow definition class itself. If Exception is set, it effectively will fail a workflow/update in all user exception cases.

  • workflow_payload_codec_thread_pool (ThreadPool, nil) (defaults to: nil)

    Thread pool to run payload codec encode/decode within. This is required if a payload codec exists and the worker is not fiber based. Codecs can potentially block execution which is why they need to be run in the background.

  • unsafe_workflow_io_enabled (Boolean) (defaults to: false)

    If false, the default, workflow code that invokes io_wait on the fiber scheduler will fail. Instead of setting this to true, users are encouraged to use Temporalio::Workflow::Unsafe.io_enabled with a block for narrower enabling of IO.

  • deployment_options (DeploymentOptions, nil) (defaults to: Worker.default_deployment_options)

    Deployment options for the worker. WARNING: This is an experimental feature and may change in the future.

  • workflow_task_poller_behavior (PollerBehavior) (defaults to: PollerBehavior::SimpleMaximum.new(max_concurrent_workflow_task_polls))

    Specify the behavior of workflow task polling. Defaults to a 5-poller maximum.

  • activity_task_poller_behavior (PollerBehavior) (defaults to: PollerBehavior::SimpleMaximum.new(max_concurrent_activity_task_polls))

    Specify the behavior of activity task polling. Defaults to a 5-poller maximum.

  • debug_mode (Boolean) (defaults to: %w[true 1].include?(ENV['TEMPORAL_DEBUG'].to_s.downcase))

    If true, deadlock detection is disabled. Deadlock detection will fail workflow tasks if they block the thread for too long. This defaults to true if the ‘TEMPORAL_DEBUG` environment variable is `true` or `1`.

Raises:

  • (ArgumentError)


382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
# File 'lib/temporalio/worker.rb', line 382

def initialize(
  client:,
  task_queue:,
  activities: [],
  workflows: [],
  tuner: Tuner.create_fixed,
  activity_executors: ActivityExecutor.defaults,
  workflow_executor: WorkflowExecutor::ThreadPool.default,
  interceptors: [],
  identity: nil,
  logger: client.options.logger,
  max_cached_workflows: 1000,
  max_concurrent_workflow_task_polls: 5,
  nonsticky_to_sticky_poll_ratio: 0.2,
  max_concurrent_activity_task_polls: 5,
  no_remote_activities: false,
  sticky_queue_schedule_to_start_timeout: 10,
  max_heartbeat_throttle_interval: 60,
  default_heartbeat_throttle_interval: 30,
  max_activities_per_second: nil,
  max_task_queue_activities_per_second: nil,
  graceful_shutdown_period: 0,
  disable_eager_activity_execution: false,
  illegal_workflow_calls: Worker.default_illegal_workflow_calls,
  workflow_failure_exception_types: [],
  workflow_payload_codec_thread_pool: nil,
  unsafe_workflow_io_enabled: false,
  deployment_options: Worker.default_deployment_options,
  workflow_task_poller_behavior: PollerBehavior::SimpleMaximum.new(max_concurrent_workflow_task_polls),
  activity_task_poller_behavior: PollerBehavior::SimpleMaximum.new(max_concurrent_activity_task_polls),
  debug_mode: %w[true 1].include?(ENV['TEMPORAL_DEBUG'].to_s.downcase)
)
  raise ArgumentError, 'Must have at least one activity or workflow' if activities.empty? && workflows.empty?

  Internal::ProtoUtils.assert_non_reserved_name(task_queue)

  @options = Options.new(
    client:,
    task_queue:,
    activities:,
    workflows:,
    tuner:,
    activity_executors:,
    workflow_executor:,
    interceptors:,
    identity:,
    logger:,
    max_cached_workflows:,
    max_concurrent_workflow_task_polls:,
    nonsticky_to_sticky_poll_ratio:,
    max_concurrent_activity_task_polls:,
    no_remote_activities:,
    sticky_queue_schedule_to_start_timeout:,
    max_heartbeat_throttle_interval:,
    default_heartbeat_throttle_interval:,
    max_activities_per_second:,
    max_task_queue_activities_per_second:,
    graceful_shutdown_period:,
    disable_eager_activity_execution:,
    illegal_workflow_calls:,
    workflow_failure_exception_types:,
    workflow_payload_codec_thread_pool:,
    unsafe_workflow_io_enabled:,
    deployment_options:,
    workflow_task_poller_behavior:,
    activity_task_poller_behavior:,
    debug_mode:
  ).freeze

  should_enforce_versioning_behavior =
    deployment_options.use_worker_versioning &&
    deployment_options.default_versioning_behavior == VersioningBehavior::UNSPECIFIED
  # Preload workflow definitions and some workflow settings for the bridge
  workflow_definitions = Internal::Worker::WorkflowWorker.workflow_definitions(
    workflows,
    should_enforce_versioning_behavior: should_enforce_versioning_behavior
  )
  nondeterminism_as_workflow_fail, nondeterminism_as_workflow_fail_for_types =
    Internal::Worker::WorkflowWorker.bridge_workflow_failure_exception_type_options(
      workflow_failure_exception_types:, workflow_definitions:
    )

  # Create the bridge worker
  @bridge_worker = Internal::Bridge::Worker.new(
    client.connection._core_client,
    Internal::Bridge::Worker::Options.new(
      namespace: client.namespace,
      task_queue:,
      tuner: tuner._to_bridge_options,
      identity_override: identity,
      max_cached_workflows:,
      workflow_task_poller_behavior: workflow_task_poller_behavior._to_bridge_options,
      nonsticky_to_sticky_poll_ratio:,
      activity_task_poller_behavior: activity_task_poller_behavior._to_bridge_options,
      enable_workflows: !workflows.empty?,
      enable_local_activities: !workflows.empty? && !activities.empty?,
      enable_remote_activities: !activities.empty? && !no_remote_activities,
      enable_nexus: false,
      sticky_queue_schedule_to_start_timeout:,
      max_heartbeat_throttle_interval:,
      default_heartbeat_throttle_interval:,
      max_worker_activities_per_second: max_activities_per_second,
      max_task_queue_activities_per_second:,
      graceful_shutdown_period:,
      nondeterminism_as_workflow_fail:,
      nondeterminism_as_workflow_fail_for_types:,
      deployment_options: deployment_options._to_bridge_options
    )
  )

  # Collect interceptors from client and params
  @activity_interceptors = (client.options.interceptors + interceptors).select do |i|
    i.is_a?(Interceptor::Activity)
  end
  @workflow_interceptors = (client.options.interceptors + interceptors).select do |i|
    i.is_a?(Interceptor::Workflow)
  end

  # Cancellation for the whole worker
  @worker_shutdown_cancellation = Cancellation.new

  # Create workers
  unless activities.empty?
    @activity_worker = Internal::Worker::ActivityWorker.new(worker: self,
                                                            bridge_worker: @bridge_worker)
  end
  unless workflows.empty?
    @workflow_worker = Internal::Worker::WorkflowWorker.new(
      bridge_worker: @bridge_worker,
      namespace: client.namespace,
      task_queue:,
      workflow_definitions:,
      workflow_executor:,
      logger:,
      data_converter: client.data_converter,
      metric_meter: client.connection.options.runtime.metric_meter,
      workflow_interceptors: @workflow_interceptors,
      disable_eager_activity_execution:,
      illegal_workflow_calls:,
      workflow_failure_exception_types:,
      workflow_payload_codec_thread_pool:,
      unsafe_workflow_io_enabled:,
      debug_mode:,
      assert_valid_local_activity: ->(activity) { _assert_valid_local_activity(activity) }
    )
  end

  # Validate worker
  @bridge_worker.validate

  # Mutex needed for accessing and replacing a client
  @client_mutex = Mutex.new
end