Class: Concurrently::Proc

Inherits:
Proc
  • Object
show all
Includes:
CallbacksAttachable
Defined in:
lib/all/concurrently/proc.rb,
lib/all/concurrently/proc.rb,
lib/all/concurrently/proc.rb

Overview

Note:

Concurrent procs are thread safe.

A Concurrently::Proc is like a regular Proc except its block of code is evaluated concurrently. Its evaluation can wait for other stuff to happen (e.g. result of evaluations or readiness of an IO) without blocking the execution of its thread.

Errors raised inside concurrent evaluations are re-raised when getting their result with Evaluation#await_result. They are also logged to stderr by default. This behavior can be controlled by Proc.error_log_output=. In addition, errors can be watched by registering callbacks for the :error event as shown in the example below. This is useful as a central access point to errors happening inside concurrent evaluations for recovery purposes. Also, concurrent procs evaluated with #call_and_forget are evaluated in the background with no access to their evaluation and if they fail they do so silently. The callbacks are the only way to gain access to their errors.

The callbacks can be registered for all procs or only for one specific proc:

Examples:

Watching errors

# Callbacks for all procs are registered for the `Concurrently::Proc` class:
Concurrently::Proc.on(:error) do |error|
  puts "error in one of many procs: #{error}"
end

concurrently do
  raise "eternal darkness"
end

sunshine_proc = concurrent_proc do
  raise "eternal sunshine"
end

# Callbacks for a single proc are registered for the instance:
sunshine_proc.on(:error) do |error|
  puts "error in the sunshine proc: #{error}"
end

# defer execution a little. This will make the concurrently block run in the
# meantime.
wait 0
# the concurrently block will fail in the background and causes a printed
# "error in one of many procs: eternal darkness"

sunshine_proc.call
# prints "error in one of many procs: eternal sunshine"
# prints "error in the sunshine proc: eternal sunshine"
# raises RuntimeError: eternal sunshine

Since:

  • 1.0.0

Defined Under Namespace

Classes: Evaluation

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(evaluation_class = Evaluation) ⇒ Proc

A new instance of Concurrently::Proc

Parameters:

  • evaluation_class (Class) (defaults to: Evaluation)

    It can be given a custom class to create evaluation objects. This can be useful if all evaluations for this proc share some custom behavior and it makes sense to create a sub class of Evaluation for them.

Since:

  • 1.0.0



124
125
126
# File 'lib/all/concurrently/proc.rb', line 124

def initialize(evaluation_class = Evaluation)
  @evaluation_class = evaluation_class
end

Class Method Details

.error_log_output=(output) ⇒ Object

Sets the output to which errors in concurrent procs are written to.

By default, errors are written to stderr. To disable logging of errors, set the output to nil or false.

Examples:

require 'logger'
Concurrently::Proc.error_log_output = Logger.new(STDOUT)

Parameters:

  • output (IO|Logger|false|nil)

Since:

  • 1.2.0



102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/all/concurrently/proc.rb', line 102

def self.error_log_output=(output)
  if Object.const_defined? :Logger and Logger === output
    @error_handler.cancel if @error_handler
    @error_handler = on(:error){ |error| output.error Proc.default_error_log_message self, error }
  elsif IO === output
    @error_handler.cancel if @error_handler
    @error_handler = on(:error){ |error| output.puts Proc.default_error_log_message self, error }
  elsif !output
    remove_instance_variable(:@error_handler).cancel if @error_handler
  else
    raise Error, "output no logger or IO"
  end
end

Instance Method Details

#call(*args) ⇒ Object Also known as: []

Evaluates the concurrent proc in a blocking manner.

Evaluating the proc this way executes its block of code immediately and blocks the current thread of execution until the result is available.

Examples:

The proc can be evaluated without waiting

add = concurrent_proc do |a, b|
  a + b
end
add.call 5, 8 # => 13

The proc needs to wait to conclude evaluation

time_in = concurrent_proc do |seconds|
  wait seconds
  Time.now
end

Time.now.strftime('%H:%M:%S.%L')          # => "13:47:45.850"
time_in.call(1.5).strftime('%H:%M:%S.%L') # => "13:47:47.351"

Returns:

  • (Object)

    the result of the evaluation.

Raises:

  • (Exception)

    if the evaluation raises an error.

Since:

  • 1.0.0



170
171
172
173
174
175
176
177
# File 'lib/all/concurrently/proc.rb', line 170

def call(*args)
  case immediate_result = call_nonblock(*args)
  when Evaluation
    immediate_result.await_result
  else
    immediate_result
  end
end

#call_and_forget(*args) ⇒ nil

Fire and forget variation of #call_detached.

Once called, there is no way to control the evaluation anymore. But, because we save creating an Evaluation instance this is slightly faster than #call_detached.

Examples:

add = concurrent_proc do |a, b|
  puts "detached!"
end
add.call_and_forget 5, 8

# we need to enter the event loop to see an effect
wait 0 # prints "detached!"

Returns:

  • (nil)

Since:

  • 1.0.0



291
292
293
294
295
296
297
298
# File 'lib/all/concurrently/proc.rb', line 291

def call_and_forget(*args)
  event_loop = EventLoop.current
  # run without creating an Evaluation object at first. It will be created
  # if the proc needs to wait for something.
  event_loop.run_queue.schedule_immediately event_loop.proc_fiber_pool.take_fiber, [self, args], false

  nil
end

#call_detached(*args) ⇒ Evaluation

Evaluates the concurrent proc detached from the current execution thread.

Evaluating the proc this way detaches the evaluation from the current thread of execution and schedules its start during the next iteration of the event loop.

To execute code this way you can also use the shortcut Kernel#concurrently.

Examples:

add = concurrent_proc do |a, b|
  a + b
end
evaluation = add.call_detached 5, 8
evaluation.await_result # => 13

Returns:

Since:

  • 1.0.0



268
269
270
271
272
273
# File 'lib/all/concurrently/proc.rb', line 268

def call_detached(*args)
  event_loop = EventLoop.current
  evaluation = @evaluation_class.new(event_loop.proc_fiber_pool.take_fiber)
  event_loop.run_queue.schedule_immediately evaluation, [self, args, [evaluation]]
  evaluation
end

#call_nonblock(*args) ⇒ Object, Evaluation

Evaluates the concurrent proc in a non-blocking manner.

Evaluating the proc this way executes its block of code immediately until the result is available or the evaluation needs to wait.

Dealing with this method is similar to dealing with IO#*_nonblock.

Examples:

The proc can be evaluated without waiting

add = concurrent_proc do |a, b|
  a + b
end

case immediate_result = add.call_nonblock(5, 8)
when Concurrently::Evaluation
  # won't happen here
else
  immediate_result # => 13
end

The proc needs to wait to conclude evaluation

time_in = concurrent_proc do |seconds|
  wait seconds
  Time.now
end

Time.now.strftime('%H:%M:%S.%L') # => "15:18:42.439"

case immediate_result = time_in.call_nonblock(1.5)
when Concurrently::Evaluation
  immediate_result.await_result.strftime('%H:%M:%S.%L') # => "15:18:44.577"
else
  # won't happen here
end

Returns:

  • (Object)

    the result of the evaluation if it can be executed without waiting.

  • (Evaluation)

    if the evaluation needs to wait.

Raises:

  • (Exception)

    if the evaluation raises an error.

Since:

  • 1.0.0



219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
# File 'lib/all/concurrently/proc.rb', line 219

def call_nonblock(*args)
  event_loop = EventLoop.current
  run_queue = event_loop.run_queue
  evaluation_bucket = []

  result = begin
    fiber = event_loop.proc_fiber_pool.take_fiber
    # ProcFiberPool#take_fiber might have accessed the current evaluation
    # if it needs to wait for the next iteration to get a fiber. Reset the
    # current evaluation afterwards!
    previous_evaluation = run_queue.current_evaluation
    run_queue.current_evaluation = nil
    run_queue.evaluation_class = @evaluation_class
    fiber.resume [self, args, evaluation_bucket]
  ensure
    run_queue.current_evaluation = previous_evaluation
    run_queue.evaluation_class = nil
  end

  case result
  when Evaluation
    # The proc fiber if the proc cannot be evaluated without waiting.
    # Inject the evaluation into it so it can be concluded later.
    evaluation_bucket << result
    result
  when Exception
    raise result
  else
    result
  end
end