Module: Kernel

Defined in:
lib/all/kernel.rb

Overview

Concurrently adds a few methods to Kernel which makes them available for every object.

Since:

  • 1.0.0

Instance Method Summary collapse

Instance Method Details

#await_resume!(opts = {}) ⇒ Object (private)

Note:

The exclamation mark in its name stands for: Watch out! This method needs to be complemented with a later call to Concurrently::Evaluation#resume!.

Suspends the current evaluation until it is resumed manually. It can be used inside and outside of concurrent procs.

It needs to be complemented with a later call of Concurrently::Evaluation#resume!.

Examples:

Waiting inside a concurrent proc

# Control flow is indicated by (N)

# (1)
evaluation = concurrent_proc do
   # (4)
   await_resume!
   # (7)
end.call_nonblock

# (2)
concurrently do
  # (5)
  puts "I'm running while the outside is waiting!"
  evaluation.resume! :result
  # (6)
end

# (3)
evaluation.await_result # => :result
# (8)

Waiting outside a concurrent proc

# Control flow is indicated by (N)

evaluation = Concurrently::Evaluation.current

# (1)
concurrently do
  # (3)
  puts "I'm running while the outside is waiting!"
  evaluation.resume! :result
  # (4)
end

# (2)
await_resume! # => :result
# (5)

Waiting with a timeout

await_resume! within: 1
# => raises a TimeoutError after 1 second

Waiting with a timeout and a timeout result

await_resume! within: 0.1, timeout_result: false
# => returns false after 0.1 second

Parameters:

  • opts (Hash) (defaults to: {})

Options Hash (opts):

  • :within (Numeric)

    maximum time to wait (defaults to: Float::INFINITY)

  • :timeout_result (Object)

    result to return in case of an exceeded waiting time (defaults to raising Concurrently::Evaluation::TimeoutError)

Returns:

Raises:

Since:

  • 1.0.0



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/all/kernel.rb', line 109

private def await_resume!(opts = {})
  event_loop = Concurrently::EventLoop.current
  run_queue = event_loop.run_queue
  evaluation = run_queue.current_evaluation

  if seconds = opts[:within]
    timeout_result = opts.fetch(:timeout_result, Concurrently::Evaluation::TimeoutError)
    run_queue.schedule_deferred(evaluation, seconds, timeout_result)
  end

  evaluation.instance_variable_set :@waiting, true
  result = case evaluation
  when Concurrently::Proc::Evaluation
    # Yield back to the event loop fiber or the evaluation evaluating this one.
    # Pass along itself to indicate it is not yet fully evaluated.
    Fiber.yield evaluation
  else
    event_loop.fiber.resume
  end
  evaluation.instance_variable_set :@waiting, false

  # If result is this very evaluation it means this evaluation has been evaluated
  # prematurely.
  if evaluation.fiber == result
    run_queue.cancel evaluation # in case the evaluation has already been scheduled to resume

    # Generally, throw-catch is faster than raise-rescue if the code needs to
    # play back the call stack, i.e. the throw resp. raise is invoked. If not
    # playing back the call stack, a begin block is faster than a catch
    # block. Since we won't jump out of the proc above most of the time, we
    # go with raise. It is rescued in the proc fiber.
    raise Concurrently::Proc::Fiber::Cancelled, '', []
  elsif Concurrently::Evaluation::TimeoutError == result
    raise result, "evaluation timed out after #{seconds} second(s)"
  else
    result
  end
ensure
  if seconds
    run_queue.cancel evaluation
  end
end

#concurrent_proc(evaluation_class = Concurrently::Proc::Evaluation) ⇒ Concurrently::Proc (private)

Creates a concurrent proc to execute code concurrently.

This a shortcut for Concurrently::Proc.new(&block) like proc(&block) is a shortcut for Proc.new(&block).

Examples:

wait_proc = concurrent_proc do |seconds|
   wait seconds
end

wait_proc.call 2 # waits 2 seconds and then resumes

Returns:

Since:

  • 1.0.0



39
40
41
42
# File 'lib/all/kernel.rb', line 39

private def concurrent_proc(evaluation_class = Concurrently::Proc::Evaluation)
  # Concurrently::Proc.new claims the method's block just like Proc.new does
  Concurrently::Proc.new(evaluation_class)
end

#concurrently(*args) ⇒ nil (private)

Executes code concurrently in the background.

This is a shortcut for Concurrently::Proc#call_and_forget.

Examples:

concurrently(a,b,c) do |a,b,c|
  # ...
end

Returns:

  • (nil)

Since:

  • 1.0.0



19
20
21
22
# File 'lib/all/kernel.rb', line 19

private def concurrently(*args)
  # Concurrently::Proc.new claims the method's block just like Proc.new does
  Concurrently::Proc.new.call_and_forget *args
end

#wait(seconds) ⇒ true (private)

Suspends the current evaluation for the given number of seconds. It can be used inside and outside of concurrent procs.

While waiting, the code jumps to the event loop and executes other concurrent procs that are ready to run in the meantime.

Examples:

Waiting inside a concurrent proc

# Control flow is indicated by (N)

# (1)
wait_proc = concurrent_proc do |seconds|
  # (4)
  wait seconds
  # (6)
  :waited
end

# (2)
concurrently do
  # (5)
  puts "I'm running while the other proc is waiting!"
end

# (3)
wait_proc.call 1 # => :waited
# (7)

Waiting outside a concurrent proc

# Control flow is indicated by (N)

# (1)
concurrently do
  # (3)
  puts "I'm running while the outside is waiting!"
end

# (2)
wait 1
# (4)

Returns:

  • (true)

Since:

  • 1.0.0



193
194
195
196
197
198
199
200
# File 'lib/all/kernel.rb', line 193

private def wait(seconds)
  run_queue = Concurrently::EventLoop.current.run_queue
  evaluation = run_queue.current_evaluation
  run_queue.schedule_deferred(evaluation, seconds, true)
  await_resume!
ensure
  run_queue.cancel evaluation
end