Class: Temporalio::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/temporalio/worker.rb,
lib/temporalio/worker/tuner.rb,
lib/temporalio/worker/interceptor.rb,
lib/temporalio/worker/thread_pool.rb,
lib/temporalio/worker/activity_executor.rb,
lib/temporalio/worker/workflow_executor.rb,
lib/temporalio/worker/workflow_replayer.rb,
lib/temporalio/worker/activity_executor/fiber.rb,
lib/temporalio/worker/activity_executor/thread_pool.rb,
lib/temporalio/worker/workflow_executor/thread_pool.rb

Overview

Worker for processing activities and workflows on a task queue.

Workers are created for a task queue and the items they can run. Then #run is used for running a single worker, or Worker.run_all is used for a collection of workers. These can wait until a block is complete or a Cancellation is canceled.

Defined Under Namespace

Modules: Interceptor Classes: ActivityExecutor, Options, ThreadPool, Tuner, WorkflowExecutor, WorkflowReplayer

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client:, task_queue:, activities: [], workflows: [], tuner: Tuner.create_fixed, activity_executors: ActivityExecutor.defaults, workflow_executor: WorkflowExecutor::ThreadPool.default, interceptors: [], build_id: Worker.default_build_id, identity: nil, logger: client.options.logger, max_cached_workflows: 1000, max_concurrent_workflow_task_polls: 5, nonsticky_to_sticky_poll_ratio: 0.2, max_concurrent_activity_task_polls: 5, no_remote_activities: false, sticky_queue_schedule_to_start_timeout: 10, max_heartbeat_throttle_interval: 60, default_heartbeat_throttle_interval: 30, max_activities_per_second: nil, max_task_queue_activities_per_second: nil, graceful_shutdown_period: 0, use_worker_versioning: false, disable_eager_activity_execution: false, illegal_workflow_calls: Worker.default_illegal_workflow_calls, workflow_failure_exception_types: [], workflow_payload_codec_thread_pool: nil, debug_mode: %w[true 1].include?(ENV['TEMPORAL_DEBUG'].to_s.downcase)) ⇒ Worker

Create a new worker. At least one activity or workflow must be present.

Raises:

  • (ArgumentError)


350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
# File 'lib/temporalio/worker.rb', line 350

def initialize(
  client:,
  task_queue:,
  activities: [],
  workflows: [],
  tuner: Tuner.create_fixed,
  activity_executors: ActivityExecutor.defaults,
  workflow_executor: WorkflowExecutor::ThreadPool.default,
  interceptors: [],
  build_id: Worker.default_build_id,
  identity: nil,
  logger: client.options.logger,
  max_cached_workflows: 1000,
  max_concurrent_workflow_task_polls: 5,
  nonsticky_to_sticky_poll_ratio: 0.2,
  max_concurrent_activity_task_polls: 5,
  no_remote_activities: false,
  sticky_queue_schedule_to_start_timeout: 10,
  max_heartbeat_throttle_interval: 60,
  default_heartbeat_throttle_interval: 30,
  max_activities_per_second: nil,
  max_task_queue_activities_per_second: nil,
  graceful_shutdown_period: 0,
  use_worker_versioning: false,
  disable_eager_activity_execution: false,
  illegal_workflow_calls: Worker.default_illegal_workflow_calls,
  workflow_failure_exception_types: [],
  workflow_payload_codec_thread_pool: nil,
  debug_mode: %w[true 1].include?(ENV['TEMPORAL_DEBUG'].to_s.downcase)
)
  raise ArgumentError, 'Must have at least one activity or workflow' if activities.empty? && workflows.empty?

  @options = Options.new(
    client:,
    task_queue:,
    activities:,
    workflows:,
    tuner:,
    activity_executors:,
    workflow_executor:,
    interceptors:,
    build_id:,
    identity:,
    logger:,
    max_cached_workflows:,
    max_concurrent_workflow_task_polls:,
    nonsticky_to_sticky_poll_ratio:,
    max_concurrent_activity_task_polls:,
    no_remote_activities:,
    sticky_queue_schedule_to_start_timeout:,
    max_heartbeat_throttle_interval:,
    default_heartbeat_throttle_interval:,
    max_activities_per_second:,
    max_task_queue_activities_per_second:,
    graceful_shutdown_period:,
    use_worker_versioning:,
    disable_eager_activity_execution:,
    illegal_workflow_calls:,
    workflow_failure_exception_types:,
    workflow_payload_codec_thread_pool:,
    debug_mode:
  ).freeze

  # Preload workflow definitions and some workflow settings for the bridge
  workflow_definitions = Internal::Worker::WorkflowWorker.workflow_definitions(workflows)
  nondeterminism_as_workflow_fail, nondeterminism_as_workflow_fail_for_types =
    Internal::Worker::WorkflowWorker.bridge_workflow_failure_exception_type_options(
      workflow_failure_exception_types:, workflow_definitions:
    )

  # Create the bridge worker
  @bridge_worker = Internal::Bridge::Worker.new(
    client.connection._core_client,
    Internal::Bridge::Worker::Options.new(
      activity: !activities.empty?,
      workflow: !workflows.empty?,
      namespace: client.namespace,
      task_queue:,
      tuner: tuner._to_bridge_options,
      build_id:,
      identity_override: identity,
      max_cached_workflows:,
      max_concurrent_workflow_task_polls:,
      nonsticky_to_sticky_poll_ratio:,
      max_concurrent_activity_task_polls:,
      # For shutdown to work properly, we must disable remote activities
      # ourselves if there are no activities
      no_remote_activities: no_remote_activities || activities.empty?,
      sticky_queue_schedule_to_start_timeout:,
      max_heartbeat_throttle_interval:,
      default_heartbeat_throttle_interval:,
      max_worker_activities_per_second: max_activities_per_second,
      max_task_queue_activities_per_second:,
      graceful_shutdown_period:,
      use_worker_versioning:,
      nondeterminism_as_workflow_fail:,
      nondeterminism_as_workflow_fail_for_types:
    )
  )

  # Collect interceptors from client and params
  @activity_interceptors = (client.options.interceptors + interceptors).select do |i|
    i.is_a?(Interceptor::Activity)
  end
  @workflow_interceptors = (client.options.interceptors + interceptors).select do |i|
    i.is_a?(Interceptor::Workflow)
  end

  # Cancellation for the whole worker
  @worker_shutdown_cancellation = Cancellation.new

  # Create workers
  unless activities.empty?
    @activity_worker = Internal::Worker::ActivityWorker.new(worker: self,
                                                            bridge_worker: @bridge_worker)
  end
  unless workflows.empty?
    @workflow_worker = Internal::Worker::WorkflowWorker.new(
      bridge_worker: @bridge_worker,
      namespace: client.namespace,
      task_queue:,
      workflow_definitions:,
      workflow_executor:,
      logger:,
      data_converter: client.data_converter,
      metric_meter: client.connection.options.runtime.metric_meter,
      workflow_interceptors: @workflow_interceptors,
      disable_eager_activity_execution:,
      illegal_workflow_calls:,
      workflow_failure_exception_types:,
      workflow_payload_codec_thread_pool:,
      debug_mode:
    )
  end

  # Validate worker
  @bridge_worker.validate
end

Instance Attribute Details

#optionsOptions (readonly)



279
280
281
# File 'lib/temporalio/worker.rb', line 279

def options
  @options
end

Class Method Details

.default_build_idString



63
64
65
# File 'lib/temporalio/worker.rb', line 63

def self.default_build_id
  @default_build_id ||= _load_default_build_id
end

.default_illegal_workflow_callsHash<String, [:all, Array<Symbol>]>



233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
# File 'lib/temporalio/worker.rb', line 233

def self.default_illegal_workflow_calls
  @default_illegal_workflow_calls ||= begin
    hash = {
      'BasicSocket' => :all,
      'Date' => i[initialize today],
      'DateTime' => i[initialize now],
      'Dir' => :all,
      'Fiber' => [:set_scheduler],
      'File' => :all,
      'FileTest' => :all,
      'FileUtils' => :all,
      'Find' => :all,
      'GC' => :all,
      'IO' => [
        :read
        # Intentionally leaving out write so puts will work. We don't want to add heavy logic replacing stdout or
        # trying to derive whether it's file vs stdout write.
        #:write
      ],
      'Kernel' => i[abort at_exit autoload autoload? eval exec exit fork gets load open rand readline readlines
                     spawn srand system test trap],
      'Net::HTTP' => :all,
      'Pathname' => :all,
      # TODO(cretz): Investigate why clock_gettime called from Timeout thread affects this code at all. Stack trace
      # test executing activities inside a timeout will fail if clock_gettime is blocked.
      'Process' => i[abort argv0 daemon detach exec exit exit! fork kill setpriority setproctitle setrlimit setsid
                      spawn times wait wait2 waitall warmup],
      # TODO(cretz): Allow Ractor.current since exception formatting in error_highlight references it
      # 'Ractor' => :all,
      'Random::Base' => [:initialize],
      'Resolv' => :all,
      'SecureRandom' => :all,
      'Signal' => :all,
      'Socket' => :all,
      'Tempfile' => :all,
      'Thread' => i[abort_on_exception= exit fork handle_interrupt ignore_deadlock= kill new pass
                     pending_interrupt? report_on_exception= start stop initialize join name= priority= raise run
                     terminate thread_variable_set wakeup],
      'Time' => i[initialize now]
    } #: Hash[String, :all | Array[Symbol]]
    hash.each_value(&:freeze)
    hash.freeze
  end
end

.run_all(*workers, cancellation: Cancellation.new, shutdown_signals: [], raise_in_block_on_shutdown: Error::CanceledError.new('Workers finished'), wait_block_complete: true) { ... } ⇒ Object

Run all workers until cancellation or optional block completes. When the cancellation or block is complete, the workers are shut down. This will return the block result if everything successful or raise an error if not. See #run for details on how worker shutdown works.

Yields:

  • Optional block. This will be run in a new background thread or fiber. Workers will shut down upon completion of this and, assuming no other failures, return/bubble success/exception of the block.

Raises:

  • (ArgumentError)


102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
# File 'lib/temporalio/worker.rb', line 102

def self.run_all(
  *workers,
  cancellation: Cancellation.new,
  shutdown_signals: [],
  raise_in_block_on_shutdown: Error::CanceledError.new('Workers finished'),
  wait_block_complete: true,
  &block
)
  # Confirm there is at least one and they are all workers
  raise ArgumentError, 'At least one worker required' if workers.empty?
  raise ArgumentError, 'Not all parameters are workers' unless workers.all? { |w| w.is_a?(Worker) }

  Internal::Bridge.assert_fiber_compatibility!

  # Start the multi runner
  runner = Internal::Worker::MultiRunner.new(workers:, shutdown_signals:)

  # Apply block
  runner.apply_thread_or_fiber_block(&block)

  # Reuse first worker logger
  logger = workers.first&.options&.logger or raise # Never nil

  # On cancel, initiate shutdown
  cancellation.add_cancel_callback do
    logger.info('Cancel invoked, beginning worker shutdown')
    runner.initiate_shutdown
  end

  # Poller loop, run until all pollers shut down
  first_error = nil
  block_result = nil
  loop do
    event = runner.next_event
    # TODO(cretz): Consider improving performance instead of this case statement
    case event
    when Internal::Worker::MultiRunner::Event::PollSuccess
      # Successful poll
      event.worker #: Worker
           ._on_poll_bytes(runner, event.worker_type, event.bytes)
    when Internal::Worker::MultiRunner::Event::PollFailure
      # Poll failure, this causes shutdown of all workers
      logger.error('Poll failure (beginning worker shutdown if not already occurring)')
      logger.error(event.error)
      first_error ||= event.error
      runner.initiate_shutdown
    when Internal::Worker::MultiRunner::Event::WorkflowActivationDecoded
      # Came back from a codec as decoded
      event.workflow_worker.handle_activation(runner:, activation: event.activation, decoded: true)
    when Internal::Worker::MultiRunner::Event::WorkflowActivationComplete
      # An activation is complete
      event.workflow_worker.handle_activation_complete(
        runner:,
        activation_completion: event.activation_completion,
        encoded: event.encoded,
        completion_complete_queue: event.completion_complete_queue
      )
    when Internal::Worker::MultiRunner::Event::WorkflowActivationCompletionComplete
      # Completion complete, only need to log error if it occurs here
      if event.error
        logger.error("Activation completion failed to record on run ID #{event.run_id}")
        logger.error(event.error)
      end
    when Internal::Worker::MultiRunner::Event::PollerShutDown
      # Individual poller shut down. Nothing to do here until we support
      # worker status or something.
    when Internal::Worker::MultiRunner::Event::AllPollersShutDown
      # This is where we break the loop, no more polling can happen
      break
    when Internal::Worker::MultiRunner::Event::BlockSuccess
      logger.info('Block completed, beginning worker shutdown')
      block_result = event
      runner.initiate_shutdown
    when Internal::Worker::MultiRunner::Event::BlockFailure
      logger.error('Block failure (beginning worker shutdown)')
      logger.error(event.error)
      block_result = event
      first_error ||= event.error
      runner.initiate_shutdown
    when Internal::Worker::MultiRunner::Event::ShutdownSignalReceived
      logger.info('Signal received, beginning worker shutdown')
      runner.initiate_shutdown
    else
      raise "Unexpected event: #{event}"
    end
  end

  # Now that all pollers have stopped, let's wait for all to complete
  begin
    runner.wait_complete_and_finalize_shutdown
  rescue StandardError => e
    logger.warn('Failed waiting and finalizing')
    logger.warn(e)
  end

  # If there was a block but not a result yet, we want to raise if that is
  # wanted, and wait if that is wanted
  if block_given? && block_result.nil?
    runner.raise_in_thread_or_fiber_block(raise_in_block_on_shutdown) unless raise_in_block_on_shutdown.nil?
    if wait_block_complete
      event = runner.next_event
      case event
      when Internal::Worker::MultiRunner::Event::BlockSuccess
        logger.info('Block completed (after worker shutdown)')
        block_result = event
      when Internal::Worker::MultiRunner::Event::BlockFailure
        logger.error('Block failure (after worker shutdown)')
        logger.error(event.error)
        block_result = event
        first_error ||= event.error
      when Internal::Worker::MultiRunner::Event::ShutdownSignalReceived
        # Do nothing, waiting for block
      else
        raise "Unexpected event: #{event}"
      end
    end
  end

  # Notify each worker we're done with it
  workers.each(&:_on_shutdown_complete)

  # If there was an shutdown-causing error, we raise that
  if !first_error.nil?
    raise first_error
  elsif block_result.is_a?(Internal::Worker::MultiRunner::Event::BlockSuccess)
    block_result.result
  end
end

Instance Method Details

#run(cancellation: Cancellation.new, shutdown_signals: [], raise_in_block_on_shutdown: Error::CanceledError.new('Workers finished'), wait_block_complete: true) { ... } ⇒ Object

Run this worker until cancellation or optional block completes. When the cancellation or block is complete, the worker is shut down. This will return the block result if everything successful or raise an error if not.

Upon shutdown (either via cancellation, block completion, or worker fatal error), the worker immediately stops accepting new work. Then, after an optional grace period, all activities are canceled. This call then waits for every activity and workflow task to complete before returning.

Yields:

  • Optional block. This will be run in a new background thread or fiber. Worker will shut down upon completion of this and, assuming no other failures, return/bubble success/exception of the block.



510
511
512
513
514
515
516
517
518
# File 'lib/temporalio/worker.rb', line 510

def run(
  cancellation: Cancellation.new,
  shutdown_signals: [],
  raise_in_block_on_shutdown: Error::CanceledError.new('Workers finished'),
  wait_block_complete: true,
  &block
)
  Worker.run_all(self, cancellation:, shutdown_signals:, raise_in_block_on_shutdown:, wait_block_complete:, &block)
end

#task_queueString



490
491
492
# File 'lib/temporalio/worker.rb', line 490

def task_queue
  @options.task_queue
end