Class: Temporalio::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/temporalio/worker.rb,
lib/temporalio/worker/runner.rb,
lib/temporalio/worker/reactor.rb,
lib/temporalio/worker/sync_worker.rb,
lib/temporalio/worker/activity_runner.rb,
lib/temporalio/worker/activity_worker.rb,
lib/temporalio/worker/thread_pool_executor.rb

Overview

Worker to process activities.

Once created, workers can be run and shutdown explicitly via #run and #shutdown.

Defined Under Namespace

Classes: ActivityRunner, ActivityWorker, Reactor, Runner, SyncWorker, ThreadPoolExecutor

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection, namespace, task_queue, activities: [], data_converter: Temporalio::DataConverter.new, activity_executor: nil, interceptors: [], max_concurrent_activities: 100, graceful_shutdown_timeout: nil) ⇒ Worker

Create a worker to process activities.

Parameters:

  • connection (Temporalio::Connection)

    A connection to be used for this worker.

  • namespace (String)

    A namespace.

  • task_queue (String)

    A task queue.

  • activities (Array<Class>) (defaults to: [])

    A list of activities (subclasses of Activity).

  • data_converter (Temporalio::DataConverter) (defaults to: Temporalio::DataConverter.new)

    Data converter to use for all data conversions to/from payloads.

  • activity_executor (ThreadPoolExecutor) (defaults to: nil)

    Concurrent executor for all activities. Defaults to a ThreadPoolExecutor with ‘:max_concurrent_activities` available threads.

  • interceptors (Array<Temporalio::Interceptor::ActivityInbound, Temporalio::Interceptor::ActivityOutbound>) (defaults to: [])

    Collection of interceptors for this worker.

  • max_concurrent_activities (Integer) (defaults to: 100)

    Number of concurrently running activities.

  • graceful_shutdown_timeout (Integer) (defaults to: nil)

    Amount of time (in seconds) activities are given after a shutdown to complete before they are cancelled. A default value of ‘nil` means that activities are never cancelled when handling a shutdown.

Raises:

  • (ArgumentError)

    When no activities have been provided.



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/temporalio/worker.rb', line 62

def initialize(
  connection,
  namespace,
  task_queue,
  activities: [],
  data_converter: Temporalio::DataConverter.new,
  activity_executor: nil,
  interceptors: [],
  max_concurrent_activities: 100,
  graceful_shutdown_timeout: nil
)
  @started = false
  @shutdown = false
  @mutex = Mutex.new
  @runtime = Temporalio::Runtime.instance
  @activity_executor = activity_executor || ThreadPoolExecutor.new(max_concurrent_activities)
  @core_worker = Temporalio::Bridge::Worker.create(
    @runtime.core_runtime,
    connection.core_connection,
    namespace,
    task_queue,
    0, # maxCachedWorkflows disabled temporarily
    # FIXME: expose enable_non_local_activities
    activities.empty?,
  )
  sync_worker = Worker::SyncWorker.new(@core_worker)
  @activity_worker =
    unless activities.empty?
      Worker::ActivityWorker.new(
        task_queue,
        sync_worker,
        activities,
        data_converter,
        interceptors,
        @activity_executor,
        graceful_shutdown_timeout,
      )
    end

  unless @activity_worker
    raise ArgumentError, 'At least one activity must be specified'
  end
end

Class Method Details

.run(*workers, shutdown_signals: []) { ... } ⇒ Object

Run multiple workers and wait for them to be shut down.

This will not return until shutdown is complete (and all running activities in all workers finished) and will raise if any of the workers raises a fatal error.

Parameters:

  • workers (Array<Temporalio::Worker>)

    A list of the workers to be run.

  • shutdown_signals (Array<String>) (defaults to: [])

    A list of process signals for the worker to stop on. This argument can not be used with a custom block.

Yields:

  • Optionally you can provide a block by the end of which all the workers will be shut down. Any errors raised from this block will be re-raised by this method.



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/temporalio/worker.rb', line 26

def self.run(*workers, shutdown_signals: [], &block)
  unless shutdown_signals.empty?
    if block
      raise ArgumentError, 'Temporalio::Worker.run accepts :shutdown_signals or a block, but not both'
    end

    signal_queue = Queue.new

    shutdown_signals.each do |signal|
      Signal.trap(signal) { signal_queue.close }
    end

    block = -> { signal_queue.pop }
  end

  Runner.new(*workers).run(&block)
end

Instance Method Details

#run { ... } ⇒ Object

Note:

A worker is only intended to be started once. Initialize a new worker should you need to run it again.

Run the worker and wait on it to be shut down.

This will not return until shutdown is complete (and all running activities finished) and will raise if there is a worker fatal error. To run multiple workers use the class method run.

Yields:

  • Optionally you can provide a block by the end of which the worker will shut itself down. You can use this to stop a worker after some time has passed or any other arbitrary implementation has completed. Any errors raised from this block will be re-raised by this method.



118
119
120
# File 'lib/temporalio/worker.rb', line 118

def run(&block)
  Runner.new(self).run(&block)
end

#running?Boolean

Whether the worker is running.

This is only ‘true` if the worker has been started and not yet shut down.

Returns:

  • (Boolean)


195
196
197
# File 'lib/temporalio/worker.rb', line 195

def running?
  @started && !@shutdown
end

#shutdown(exception = Temporalio::Error::WorkerShutdown.new('Manual shutdown')) ⇒ Object

Initiate a worker shutdown and wait until complete.

This can be called before the worker has even started and is safe for repeated invocations. This method will not return until the worker has completed shutting down.

Parameters:

  • exception (Exception) (defaults to: Temporalio::Error::WorkerShutdown.new('Manual shutdown'))

    An exception to be raised from #run or run methods after a shutdown procedure has completed.



161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/temporalio/worker.rb', line 161

def shutdown(exception = Temporalio::Error::WorkerShutdown.new('Manual shutdown'))
  mutex.synchronize do
    return unless running?

    # Let the runner know we're shutting down, so it can stop other workers.
    # This will cause a reentrant call to this method, but the mutex above will block that call.
    runner&.shutdown(exception)

    # Initiate Core shutdown, which will start dropping poll requests
    core_worker.initiate_shutdown
    # Start the graceful activity shutdown timer, which will cancel activities after the timeout
    activity_worker&.setup_graceful_shutdown_timer(runtime.reactor)
    # Wait for workers to drain any outstanding tasks
    activity_worker&.drain
    activity_executor.shutdown
    # Finalize the shutdown by stopping the Core
    core_worker.finalize_shutdown

    @shutdown = true
  end
end

#start(runner = nil) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Note:

A worker is only intended to be started once. Initialize a new worker should you need to start it again.

Start the worker asynchronously in a shared runtime.

This is an internal method for advanced use-cases for those intended to implement their own worker runner.

Parameters:



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/temporalio/worker.rb', line 133

def start(runner = nil)
  mutex.synchronize do
    raise 'Worker is already started' if started?

    @started = true
  end

  @runner = runner
  runtime.ensure_callback_loop

  runtime.reactor.async do |task|
    if activity_worker
      task.async do |task|
        activity_worker.run(task)
      rescue StandardError => e
        shutdown(e) # initiate shutdown because of a fatal error
      end
    end
  end
end

#started?Boolean

Whether the worker has been started.

Returns:

  • (Boolean)


186
187
188
# File 'lib/temporalio/worker.rb', line 186

def started?
  @started
end