Class: Concurrently::Proc
- Inherits:
-
Proc
- Object
- Proc
- Concurrently::Proc
- Includes:
- CallbacksAttachable
- Defined in:
- lib/all/concurrently/proc.rb,
lib/all/concurrently/proc.rb,
lib/all/concurrently/proc.rb
Overview
Concurrent procs are thread safe.
A Concurrently::Proc
is like a regular Proc except its block of code is
evaluated concurrently. Its evaluation can wait for other stuff to happen
(e.g. result of evaluations or readiness of an IO) without blocking the
execution of its thread.
Errors raised inside concurrent evaluations are re-raised when getting
their result with Evaluation#await_result. They can also be watched by
registering callbacks for the :error
event as shown in the example below.
This is useful as a central hook to all errors inside concurrent
evaluations for monitoring or logging purposes. Also, concurrent procs
evaluated with #call_and_forget are evaluated in the background with
no access to their evaluation and will fail silently. The callbacks are the
only way to be notified about errors inside them.
The callbacks can be registered for all procs or only for one specific proc:
Defined Under Namespace
Classes: Evaluation
Instance Method Summary collapse
-
#call(*args) ⇒ Object
(also: #[])
Evaluates the concurrent proc in a blocking manner.
-
#call_and_forget(*args) ⇒ nil
Fire and forget variation of #call_detached.
-
#call_detached(*args) ⇒ Evaluation
Evaluates the concurrent proc detached from the current execution thread.
-
#call_nonblock(*args) ⇒ Object, Evaluation
Evaluates the concurrent proc in a non-blocking manner.
-
#initialize(evaluation_class = Evaluation) ⇒ Proc
constructor
A new instance of Proc.
Constructor Details
#initialize(evaluation_class = Evaluation) ⇒ Proc
A new instance of Concurrently::Proc
77 78 79 |
# File 'lib/all/concurrently/proc.rb', line 77 def initialize(evaluation_class = Evaluation) @evaluation_class = evaluation_class end |
Instance Method Details
#call(*args) ⇒ Object Also known as: []
Evaluates the concurrent proc in a blocking manner.
Evaluating the proc this way executes its block of code immediately and blocks the current thread of execution until the result is available.
103 104 105 106 107 108 109 110 |
# File 'lib/all/concurrently/proc.rb', line 103 def call(*args) case immediate_result = call_nonblock(*args) when Evaluation immediate_result.await_result else immediate_result end end |
#call_and_forget(*args) ⇒ nil
Fire and forget variation of #call_detached.
Once called, there is no way to control the evaluation anymore. But, because we save creating an Evaluation instance this is slightly faster than #call_detached.
224 225 226 227 228 229 230 231 |
# File 'lib/all/concurrently/proc.rb', line 224 def call_and_forget(*args) event_loop = EventLoop.current # run without creating an Evaluation object at first. It will be created # if the proc needs to wait for something. event_loop.run_queue.schedule_immediately event_loop.proc_fiber_pool.take_fiber, [self, args], false nil end |
#call_detached(*args) ⇒ Evaluation
Evaluates the concurrent proc detached from the current execution thread.
Evaluating the proc this way detaches the evaluation from the current thread of execution and schedules its start during the next iteration of the event loop.
To execute code this way you can also use the shortcut Kernel#concurrently.
201 202 203 204 205 206 |
# File 'lib/all/concurrently/proc.rb', line 201 def call_detached(*args) event_loop = EventLoop.current evaluation = @evaluation_class.new(event_loop.proc_fiber_pool.take_fiber) event_loop.run_queue.schedule_immediately evaluation, [self, args, [evaluation]] evaluation end |
#call_nonblock(*args) ⇒ Object, Evaluation
Evaluates the concurrent proc in a non-blocking manner.
Evaluating the proc this way executes its block of code immediately until the result is available or the evaluation needs to wait.
Dealing with this method is similar to dealing with IO#*_nonblock
.
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/all/concurrently/proc.rb', line 152 def call_nonblock(*args) event_loop = EventLoop.current run_queue = event_loop.run_queue evaluation_bucket = [] result = begin fiber = event_loop.proc_fiber_pool.take_fiber # ProcFiberPool#take_fiber might have accessed the current evaluation # if it needs to wait for the next iteration to get a fiber. Reset the # current evaluation afterwards! previous_evaluation = run_queue.current_evaluation run_queue.current_evaluation = nil run_queue.evaluation_class = @evaluation_class fiber.resume [self, args, evaluation_bucket] ensure run_queue.current_evaluation = previous_evaluation run_queue.evaluation_class = nil end case result when Evaluation # The proc fiber if the proc cannot be evaluated without waiting. # Inject the evaluation into it so it can be concluded later. evaluation_bucket << result result when Exception raise result else result end end |