Class: Temporalio::Internal::Worker::MultiRunner
- Inherits:
-
Object
- Object
- Temporalio::Internal::Worker::MultiRunner
show all
- Defined in:
- lib/temporalio/internal/worker/multi_runner.rb
Overview
Primary worker (re)actor-style event handler. This handles multiple workers, receiving events from the bridge, and handling a user block.
Defined Under Namespace
Classes: Event, InjectEventForTesting
Instance Method Summary
collapse
Constructor Details
#initialize(workers:, shutdown_signals:) ⇒ MultiRunner
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
# File 'lib/temporalio/internal/worker/multi_runner.rb', line 12
def initialize(workers:, shutdown_signals:)
@workers = workers
@queue = Queue.new
@shutdown_initiated_mutex = Mutex.new
@shutdown_initiated = false
shutdown_signals.each do |signal|
Signal.trap(signal) { @queue.push(Event::ShutdownSignalReceived.new) }
end
Bridge::Worker.async_poll_all(workers.map(&:_bridge_worker), @queue)
end
|
Instance Method Details
#apply_thread_or_fiber_block ⇒ Object
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
# File 'lib/temporalio/internal/worker/multi_runner.rb', line 28
def apply_thread_or_fiber_block(&)
return unless block_given?
@thread_or_fiber = if Fiber.current_scheduler
Fiber.schedule do
@queue.push(Event::BlockSuccess.new(result: yield))
rescue InjectEventForTesting => e
@queue.push(e.event)
@queue.push(Event::BlockSuccess.new(result: e))
rescue Exception => e
@queue.push(Event::BlockFailure.new(error: e))
end
else
Thread.new do
@queue.push(Event::BlockSuccess.new(result: yield))
rescue InjectEventForTesting => e
@queue.push(e.event)
@queue.push(Event::BlockSuccess.new(result: e))
rescue Exception => e
@queue.push(Event::BlockFailure.new(error: e))
end
end
end
|
#apply_workflow_activation_complete(workflow_worker:, activation_completion:, encoded:) ⇒ Object
56
57
58
59
60
|
# File 'lib/temporalio/internal/worker/multi_runner.rb', line 56
def apply_workflow_activation_complete(workflow_worker:, activation_completion:, encoded:)
@queue.push(Event::WorkflowActivationComplete.new(
workflow_worker:, activation_completion:, encoded:, completion_complete_queue: @queue
))
end
|
#apply_workflow_activation_decoded(workflow_worker:, activation:) ⇒ Object
52
53
54
|
# File 'lib/temporalio/internal/worker/multi_runner.rb', line 52
def apply_workflow_activation_decoded(workflow_worker:, activation:)
@queue.push(Event::WorkflowActivationDecoded.new(workflow_worker:, activation:))
end
|
#initiate_shutdown ⇒ Object
Clarify this is the only thread-safe function here
67
68
69
70
71
72
73
74
75
76
|
# File 'lib/temporalio/internal/worker/multi_runner.rb', line 67
def initiate_shutdown
should_call = @shutdown_initiated_mutex.synchronize do
break false if @shutdown_initiated
@shutdown_initiated = true
end
return unless should_call
@workers.each(&:_initiate_shutdown)
end
|
#next_event ⇒ Object
Intentionally not an enumerable/enumerator since stop semantics are caller determined
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
# File 'lib/temporalio/internal/worker/multi_runner.rb', line 88
def next_event
result = @queue.pop
if result.is_a?(Event)
result
else
first, second, third = result
if first.nil? || second.nil?
Event::.instance
elsif first == -1
Event::WorkflowActivationCompletionComplete.new(run_id: second, error: third)
else
worker = @workers[first]
case third
when nil
Event::PollerShutDown.new(worker:, worker_type: second)
when Exception
Event::PollFailure.new(worker:, worker_type: second, error: third)
else
Event::PollSuccess.new(worker:, worker_type: second, bytes: third)
end
end
end
end
|
#raise_in_thread_or_fiber_block(error) ⇒ Object
62
63
64
|
# File 'lib/temporalio/internal/worker/multi_runner.rb', line 62
def raise_in_thread_or_fiber_block(error)
@thread_or_fiber&.raise(error)
end
|
#wait_complete_and_finalize_shutdown ⇒ Object
78
79
80
81
82
83
84
|
# File 'lib/temporalio/internal/worker/multi_runner.rb', line 78
def wait_complete_and_finalize_shutdown
@workers.each(&:_wait_all_complete)
Bridge::Worker.finalize_shutdown_all(@workers.map(&:_bridge_worker))
end
|