Method: Temporalio::Worker::ActivityWorker#run

Defined in:
lib/temporalio/worker/activity_worker.rb

#run(reactor) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/temporalio/worker/activity_worker.rb', line 37

def run(reactor)
  # @type var outstanding_tasks: Array[Async::Task]
  outstanding_tasks = []

  loop do
    activity_task = worker.poll_activity_task
    outstanding_tasks << reactor.async do |async_task|
      if activity_task.start
        handle_start_activity(activity_task.task_token, activity_task.start)
      elsif activity_task.cancel
        handle_cancel_activity(activity_task.task_token, activity_task.cancel)
      end
    ensure
      outstanding_tasks.delete(async_task)
    end
  end
rescue Temporalio::Bridge::Error::WorkerShutdown
  # No need to re-raise this error, it's a part of a normal shutdown
ensure
  outstanding_tasks.each(&:wait)
  @cancelation_task&.wait
  drain_queue.close
end