Module: Kernel

Defined in:
lib/kernel/sync.rb,
lib/kernel/async.rb,
lib/kernel/barrier.rb

Overview

Extensions to all Ruby objects.

Instance Method Summary collapse

Instance Method Details

#AsyncObject

Run the given block of code in a task, asynchronously, creating a reactor if necessary.

The preferred method to invoke asynchronous behavior at the top level.

  • When invoked within an existing reactor task, it will run the given block

asynchronously. Will return the task once it has been scheduled.

  • When invoked at the top level, will create and run a reactor, and invoke

the block as an asynchronous task. Will block until the reactor finishes running.



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/kernel/async.rb', line 24

def Async(...)
  if current = ::Async::Task.current?
    return current.async(...)
  elsif scheduler = Fiber.scheduler
    ::Async::Task.run(scheduler, ...)
  else
    # This calls Fiber.set_scheduler(self):
    reactor = ::Async::Reactor.new
    
    begin
      return reactor.run(...)
    ensure
      Fiber.set_scheduler(nil)
    end
  end
end

#Barrier(parent: Async::Idler.new, **options) ⇒ Object

Create a barrier, yield it to the block, and then wait for all tasks to complete.

If no scheduler is running, one will be created automatically for the duration of the block.

By default, the barrier uses an ‘Async::Idler` to manage load, but this can be overridden by providing a different parent or `nil` to disable load management.



20
21
22
23
24
25
26
27
28
29
30
# File 'lib/kernel/barrier.rb', line 20

def Barrier(parent: Async::Idler.new, **options)
  Sync(**options) do |task|
    barrier = ::Async::Barrier.new(parent: parent)
    
    yield barrier
    
    barrier.wait
  ensure
    barrier&.stop
  end
end

#Sync(annotation: nil, &block) ⇒ Object

Run the given block of code synchronously, but within a reactor if not already in one.



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/kernel/sync.rb', line 20

def Sync(annotation: nil, &block)
  if task = ::Async::Task.current?
    if annotation
      task.annotate(annotation){yield task}
    else
      yield task
    end
  elsif scheduler = Fiber.scheduler
    ::Async::Task.run(scheduler, &block).wait
  else
    # This calls Fiber.set_scheduler(self):
    reactor = Async::Reactor.new
    
    begin
      # Use finished: false to suppress warnings since we're handling exceptions explicitly
      task = reactor.async(annotation: annotation, finished: false, &block)
      reactor.run
      return task.wait
    ensure
      Fiber.set_scheduler(nil)
    end
  end
end