Module: Kernel

Defined in:
lib/all/kernel.rb

Overview

Concurrently adds a few methods to Kernel which makes them available for every object.

Instance Method Summary collapse

Instance Method Details

#await_fastest(evaluation0, evaluation1, *more_evaluations, opts = {}) ⇒ Concurrently::Proc::Evaluation (private)

Waits for the first in a list of evaluations to be concluded.

Examples:

evaluation0 = concurrently{ wait 2 }
evaluation1 = concurrently{ wait 1 }

await_fastest(evaluation0, evaluation1) # => evaluation1

Waiting with a timeout

evaluation0 = concurrently{ wait 2 }
evaluation1 = concurrently{ wait 1 }

await_fastest(evaluation0, evaluation1, within: 0.1)
# => raises a TimeoutError after 0.1 seconds

Waiting with a timeout and a timeout result

evaluation0 = concurrently{ wait 2 }
evaluation1 = concurrently{ wait 1 }

await_fastest(evaluation0, evaluation1, within: 0.1, timeout_result: false)
# => returns false after 0.1 seconds

Parameters:

Options Hash (opts):

  • :within (Numeric)

    maximum time to wait (defaults to: Float::INFINITY)

  • :timeout_result (Object)

    result to return in case of an exceeded waiting time (defaults to raising Concurrently::Evaluation::TimeoutError)

Returns:

Raises:

Since:

  • 1.1.0



227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
# File 'lib/all/kernel.rb', line 227

private def await_fastest(eval0, eval1, *evaluations)
  opts = (evaluations.last.is_a? Hash) ? evaluations.pop : {}
  evaluations.unshift eval0, eval1

  if concluded = evaluations.find(&:concluded?)
    concluded
  else
    begin
      curr_eval = Concurrently::Evaluation.current
      evaluations.each{ |eval| eval.__add_waiting_evaluation__ curr_eval, eval }
      await_resume! opts
    ensure
      evaluations.each{ |eval| eval.__remove_waiting_evaluation__ curr_eval }
    end
  end
end

#await_resume!(opts = {}) ⇒ Object (private)

Note:

The exclamation mark in its name stands for: Watch out! This method needs to be complemented with a later call to Concurrently::Evaluation#resume!.

Suspends the current evaluation until it is resumed manually.

It needs to be complemented with a later call of Concurrently::Evaluation#resume!.

Examples:

Waiting inside a concurrent evaluation

# Control flow is indicated by (N)

# (1)
evaluation = concurrently do
  # (4)
  await_resume!
  # (7)
end

# (2)
concurrently do
  # (5)
  puts "I'm running while the outside is waiting!"
  evaluation.resume! :result
  # (6)
end

# (3)
evaluation.await_result # => :result
# (8)

Waiting outside a concurrent evaluation

# Control flow is indicated by (N)

evaluation = Concurrently::Evaluation.current

# (1)
concurrently do
  # (3)
  puts "I'm running while the outside is waiting!"
  evaluation.resume! :result
  # (4)
end

# (2)
await_resume! # => :result
# (5)

Waiting with a timeout

await_resume! within: 1
# => raises a TimeoutError after 1 second

Waiting with a timeout and a timeout result

await_resume! within: 0.1, timeout_result: false
# => returns false after 0.1 seconds

Parameters:

  • opts (Hash) (defaults to: {})

Options Hash (opts):

  • :within (Numeric)

    maximum time to wait (defaults to: Float::INFINITY)

  • :timeout_result (Object)

    result to return in case of an exceeded waiting time (defaults to raising Concurrently::Evaluation::TimeoutError)

Returns:

Raises:

Since:

  • 1.0.0



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/all/kernel.rb', line 113

private def await_resume!(opts = {})
  event_loop = Concurrently::EventLoop.current
  run_queue = event_loop.run_queue
  evaluation = run_queue.current_evaluation

  if seconds = opts[:within]
    timeout_result = opts.fetch(:timeout_result, Concurrently::Evaluation::TimeoutError)
    run_queue.schedule_deferred(evaluation, seconds, timeout_result)
  end

  result = evaluation.__suspend__ event_loop.fiber

  if Concurrently::Proc::Evaluation::Cancelled.equal? result
    run_queue.cancel evaluation # in case the evaluation has already been scheduled to resume
    raise Concurrently::Proc::Evaluation::Cancelled, '' # TODO: add empty backtrace as last argument once murby supports it
  elsif Concurrently::Evaluation::TimeoutError.equal? result
    raise Concurrently::Evaluation::TimeoutError, "evaluation timed out after #{seconds} second(s)"
  else
    result
  end
ensure
  if seconds
    run_queue.cancel evaluation
  end
end

#concurrent_proc(evaluation_class = Concurrently::Proc::Evaluation) ⇒ Concurrently::Proc (private)

Creates a concurrent proc to execute code concurrently.

This a shortcut for Concurrently::Proc.new(&block) like proc(&block) is a shortcut for Proc.new(&block).

Examples:

wait_proc = concurrent_proc do |seconds|
  wait seconds
end

wait_proc.call 2 # waits 2 seconds and then resumes

Returns:

Since:

  • 1.0.0



42
43
44
45
# File 'lib/all/kernel.rb', line 42

private def concurrent_proc(evaluation_class = Concurrently::Proc::Evaluation)
  # Concurrently::Proc.new claims the method's block just like Proc.new does
  Concurrently::Proc.new(evaluation_class)
end

#concurrently(*args) ⇒ Evaluation (private)

Executes code concurrently in the background.

This is a shortcut for Concurrently::Proc#call_detached.

Examples:

concurrently(a,b,c) do |a,b,c|
  # ...
end

Returns:

  • (Evaluation)

Since:

  • 1.0.0



20
21
22
23
# File 'lib/all/kernel.rb', line 20

private def concurrently(*args)
  # Concurrently::Proc.new claims the method's block just like Proc.new does
  Concurrently::Proc.new.call_detached *args
end

#wait(seconds) ⇒ true (private)

Suspends the current evaluation for the given number of seconds.

While waiting, the code jumps to the event loop and executes other evaluations that are ready to run in the meantime.

Examples:

Waiting inside a concurrent evaluation

# Control flow is indicated by (N)

# (1)
wait_eval = concurrently do
  # (4)
  wait 1
  # (6)
  :waited
end

# (2)
concurrently do
  # (5)
  puts "I'm running while the other proc is waiting!"
end

# (3)
wait_eval.await_result # => :waited
# (7)

Waiting outside a concurrent evaluation

# Control flow is indicated by (N)

# (1)
concurrently do
  # (3)
  puts "I'm running while the outside is waiting!"
end

# (2)
wait 1
# (4)

Returns:

  • (true)

Since:

  • 1.0.0



181
182
183
184
185
186
187
188
# File 'lib/all/kernel.rb', line 181

private def wait(seconds)
  run_queue = Concurrently::EventLoop.current.run_queue
  evaluation = run_queue.current_evaluation
  run_queue.schedule_deferred(evaluation, seconds, true)
  await_resume!
ensure
  run_queue.cancel evaluation
end