An Overview of Concurrently

The README already introduced the basic interface of Concurrently. This document explores the underlying concepts and explains how all parts work together. For even more details and examples about a specific topic follow the interspersed links to the API documentation.

Let's start with the concept of an evaluation.

Evaluations

An evaluation is an independent execution context. It is similar to a thread or a fiber since it can be suspended and resumed independently from other evaluations.

Every ruby program already has an implicit root evaluation running. Unless you explicitly tell your program to evaluate code concurrently it is evaluated in the root evaluation. The root evaluation runs as long as your program is running. Thus it is never concluded and its result cannot be awaited.

Evaluating code with concurrently(&block) is done in its own type of evaluation. Contrary to the root evaluation, this evaluation has an end with a result. Next to its similarity to a thread resp. fiber it is also similar to a future or a promise. It provides access to its (future) result and offers the ability to shortcut its execution by manually injecting a result. Once the evaluation has a result it is concluded.

# This is the root evaluation

concurrently do
  # This is a concurrent evaluation
end

concurrently do
  # This is another concurrent evaluation
end

Concurrent Evaluation of Code

Evaluating a piece of code concurrently involves three distinct phases:

               1      2      3
evaluation0 ---+-------------+--->
               |             |
evaluation1    `--+-----+----ยด
                  |     |
                  t     io
  1. Invocation: evaluation0 kicks off evaluation1 to process the code in. It does it asynchronously by not waiting for evaluation1 to finish.
  2. Computation: evaluation0 and evaluation1 run independently from each other. evaluation1 synchronizes itself with other events (e.g. with time or I/O).
  3. Synchronization: If evaluation0 is interested in the result of evaluation1 it has to wait for it. This synchronizes evaluation1 with evaluation0 again. If evaluation1 has not finished yet evaluation0 blocks until it has.

Every tool Concurrently offers is linked to one of these phases.

Invocation

To start evaluating code concurrently use Kernel#concurrently:

evaluation = concurrently do
  # code to run concurrently
end

It returns immediately with a handle to the started evaluation. The evaluation will be processed in the background.

Kernel#concurrently is actually a shortcut for

evaluation = concurrent_proc do
  # code to run concurrently
end.call_detached

In general, you do not need to work with concurrent procs directly. Just use Kernel#concurrently. But concurrent procs give you a finer control over how the code is evaluated. This comes in handy for optimizing performance.

Concurrent Procs

The concurrent proc looks and feels just like a regular proc. In fact, Concurrently::Proc inherits from Proc. It is created with Kernel#concurrent_proc:

conproc = concurrent_proc do
  # code to run concurrently
end

Concurrent procs can be used the same way regular procs are. For example, they can be passed around or called multiple times with different arguments.

When called a concurrent proc kicks of an evaluation of its code. A concurrent proc has four methods to call it. Depending on which method is used the code is evaluated slightly differently.

The first two methods evaluate the concurrent proc immediately in the foreground:

  • Concurrently::Proc#call blocks the evaluation it has been called from until its own evaluation is concluded. Then it returns the result. This behaves just like Proc#call.
  • Concurrently::Proc#call_nonblock will not block the evaluation it has been called from if it needs to wait. Instead, it immediately returns its own evaluation. If it can be evaluated without waiting it returns the result.

The other two schedule the concurrent proc to be run in the background. The evaluation is not started right away but is deferred until the the next iteration of the event loop:

The different methods to call a concurrent proc have an impact on the execution speed. In general, Concurrently::Proc#call_detached represents a good middle ground between ease of use and performance. For an in-depth analysis of the performance implications of each call method have a look at the performance documentation. It offers a guide what to use if every cpu cycle counts.

Computation

In the computation phase the evaluation works through its code. While doing so it can synchronize itself with different events.

All synchronization methods are named await_*. As usual, there is an exception to the rule: Waiting an amount of time is done with Kernel#wait.

Synchronization with Time

To defer the current evaluation for a fixed amount of time use Kernel#wait.

  • Doing something after X seconds:

    concurrently do
      wait X
      do_it!
    end
    
  • Doing something every X seconds. This is a timer:

    concurrently do
      loop do
        wait X
        do_it!
      end
    end
    
  • Doing something after X seconds, every Y seconds, Z times:

    concurrently do
      wait X
      Z.times do
        do_it!
        wait Y
      end
    end
    
  • Doing something at a given point in time:

    concurrently do
      time = Time.new(2042,7,10, 16,13,26) # 10 July 2042, 16:13:26
      wait (time-Time.now).to_f
      do_it!
    end
    

Synchronization with I/O

To read and write from an IO and wait until the operation is complete without blocking other evaluations use IO#await_read and IO#await_written.

r,w = IO.pipe

concurrently do
  wait 1
  w.await_written "Continue!"
end


# Read from r. It will take one second until there is input because r must
# wait until the string has been written to w.
r.await_read 1024 # prints "Continue!"

r.close
w.close

Other operations like accepting from a server socket need to be done by using the corresponding #*_nonblock methods along with IO#await_readable or IO#await_writable:

require 'socket'

server = UNIXServer.new "/tmp/sock"

begin
  socket = server.accept_nonblock
rescue IO::WaitReadable
  server.await_readable
  retry
end

# socket is an accepted socket.

Synchronization with Results of Evaluations

Results of other evaluations can be waited for with Concurrently::Proc::Evaluation#await_result:

mailbox = concurrently do
  wait 1
  'message'
end

forwarder = concurrently do
  "FW: #{mailbox.await_result}"
end

# It will take one second until there is a message in the mailbox
puts forwarder.await_result # prints "FW: message"

To wait for the fastest in a list of evaluations use Kernel#await_fastest:

mailbox1 = concurrently do
  wait 1
  'slow message'
end

mailbox2 = concurrently do
  wait 0.5
  'fast message'
end

mailbox = await_fastest(mailbox1, mailbox2)
mailbox.await_result # => "fast message"

Synchronization

Synchronizing the invoking evaluation with the result of the invoked one is done as described in the section about synchronizing results of evaluations.

About the Event Loop

To understand when code is run (and when it is not) it is necessary to know a little bit more about the way Concurrently works.

Concurrently lets every thread run an event loop. These event loops work silently in the background and are responsible for watching IOs and scheduling evaluations. Evaluations are scheduled by putting them into a run queue ordered by the time they are supposed to run. The run queue is then worked off sequentially up to the point corresponding to the current time. If two evaluations are scheduled to run at the same time the evaluation scheduled first is run first.

Event loops do not run parallel to your application's code at the exact same time (e.g. on another cpu core). Instead, your code yields to them if it waits for something: The event loop is (and only is) entered if your code calls one of the synchronization methods. Later, when your code can be resumed the event loop schedules the corresponding evaluation to run again.

Keep in mind, that an event loop must never be interrupted, blocked or overloaded. A healthy event loop is one that can respond to new events immediately.

If you are experiencing issues when using Concurrently it is probably due to these properties of event loops. Have a look at the Troubleshooting page.

Implementing a Server Application

This is a blueprint how to build an application listening to a server socket, accepting connections and serving requests through them.

At first, lets implement the server. It is initialized with a socket to listen to. Listening calls the concurrent proc stored in the RECEIVER constant. It then accepts or waits for incoming connections until the server is closed.

class ConcurrentServer
  def initialize(socket)
    @socket = socket
    @listening = false
  end

  def listening?
    @listening
  end

  def listen
    @listening = true
    RECEIVER.call_nonblock self, @socket
  end

  def close
    @listening = false
    @socket.close
  end

  RECEIVER = concurrent_proc do |server, socket|
    while server.listening?
      begin
        Connection.new(socket.accept_nonblock).open
      rescue IO::WaitReadable
        socket.await_readable
        retry
      end
    end
  end
end

The implementation of the connection is structurally similar to the one of the server. But because receiving data is a little bit more complex it is done in an additional receive buffer object. Received requests are processed in their own concurrent proc to not block the receiver loop if request.process calls one of the wait methods.

class ConcurrentServer::Connection
  def initialize(socket)
    @socket = socket
    @receive_buffer = ReceiveBuffer.new socket
    @open = false
  end

  def open?
    @open
  end

  def open
    @open = true
    RECEIVER.call_nonblock self, @receive_buffer
  end

  def close
    @open = false
    @socket.close
  end

  RECEIVER = concurrent_proc do |connection, receive_buffer|
    while connection.open?
      receive_buffer.receive
      receive_buffer.shift_complete_requests.each do |request|
        REQUEST_PROC.call_nonblock request
      end
    end
  end

  REQUEST_PROC = concurrent_proc do |request|
    request.process
  end
end

The receive buffer is responsible for reading from the connection's socket and deserializing the received data.

class ConcurrentServer::Connection::ReceiveBuffer
  def initialize(socket)
    @socket = socket
    @buffer = ''
  end

  def receive
    @buffer << @socket.read_nonblock(32768)
  rescue IO::WaitReadable
    @socket.await_readable
    retry
  end

  def shift_complete_requests
    # Deserializes the buffer according to the used wire protocol, removes
    # the consumed bytes of all completely received requests from the buffer
    # and returns the requests.
  end
end

Finally, this is a script bootstrapping two concurrent servers. The script terminates after both servers were closed.

#!/bin/env ruby

require 'socket'

socket1 = UNIXServer.new "/tmp/sock1"
socket2 = UNIXServer.new "/tmp/sock2"

server_evaluation1 = ConcurrentServer.new(socket1).listen
server_evaluation2 = ConcurrentServer.new(socket2).listen

server_evaluation1.await_result # blocks until server 1 is closed
server_evaluation2.await_result # returns immediately if server 2 is already
                                # closed or blocks until it happens

Keep in mind, that to focus on the use of Concurrently the example does not take error handling for I/O, properly closing all connections and other details into account.