Method: Temporalio::Worker::ActivityWorker#run
- Defined in:
- lib/temporalio/worker/activity_worker.rb
#run(reactor) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/temporalio/worker/activity_worker.rb', line 37 def run(reactor) # @type var outstanding_tasks: Array[Async::Task] outstanding_tasks = [] loop do activity_task = worker.poll_activity_task outstanding_tasks << reactor.async do |async_task| if activity_task.start handle_start_activity(activity_task.task_token, activity_task.start) elsif activity_task.cancel handle_cancel_activity(activity_task.task_token, activity_task.cancel) end ensure outstanding_tasks.delete(async_task) end end rescue Temporalio::Bridge::Error::WorkerShutdown # No need to re-raise this error, it's a part of a normal shutdown ensure outstanding_tasks.each(&:wait) @cancelation_task&.wait drain_queue.close end |