Tribe

Tribe is a Ruby gem that implements event-driven actors. Actors are lightweight concurrent objects that use asynchronous message passing for communication.

Tribe focuses on high performance, low latency, a simple API, and flexibility. It's goal is to support at least one million actors running on a small group of threads. It is built on top of the Workers gem.

Event-driven servers can be built using Tribe EM.

Installation

Add this line to your application's Gemfile:

gem 'tribe'

And then execute:

$ bundle

Or install it yourself as:

$ gem install tribe

Actors

Actors are light-weight objects which use asynchronous message passing for communcation. There are two types of methods that you create in your actors:

  1. Command handlers are prefixed with "on_" and define the types of commands your actor will process.
  2. System handlers are postfixed with "_handler" and are built into the actor system. These are used for exception, shutdown, and cleanup handling. It is important that you call the super method since their default behavior is used by the actor system.

To send a message you use the "enqueue" method and specify a command with an optional data parameter. The return value will always be nil since messaging is asynchronous.

# Create your custom actor class.
class MyActor < Tribe::Actor
  private
  def initialize(options = {})
    super
  end

  def on_my_custom(event)
    puts "Received a custom event (#{event.inspect})"
  end

  def exception_handler(e)
    super
    puts concat_e("MyActor (#{identifier}) died.", e)
  end

  def shutdown_handler(event)
    super
    puts "MyActor (#{identifier}) is shutting down.  Put cleanup code here."
  end
end

# Create some named actors.
100.times do |i|
  MyActor.new(:name => "my_actor_#{i}")
end

# Send an event to each actor.
100.times do |i|
  actor = Tribe.registry["my_actor_#{i}"]
  actor.enqueue(:my_custom, 'hello world')
end

# Shutdown the actors.
100.times do |i|
  actor = Tribe.registry["my_actor_#{i}"]
  actor.enqueue(:shutdown)
end

Implementation

Because actors use a shared thread pool, it is important that they don't block for long periods of time (short periods are fine). Actors that block for long periods of time should use a dedicated thread (:dedicated => true or subclass from Tribe::DedicatedActor).

Options (defaults below)

actor = Tribe::Actor.new(
  :logger => nil,                   # Ruby logger instance.
  :dedicated => false,              # If true, the actor runs with a worker pool that has one thread.
  :pool => Workers.pool,            # The workers pool used to execute events.
  :mailbox => Tribe::Mailbox.new,   # The mailbox used to receive events.
  :registry => Tribe.registry,      # The registry used to store a reference to the actor if it has a name.
  :name => nil                      # The name of the actor (must be unique in the registry).
)

Registries

Registries hold references to named actors so that you can easily find them. In general you shouldn't have to create your own since there is a global one (Tribe.registry).

actor = Tribe::Actor.new(:name => 'some_actor')

if actor == Tribe.registry['some_actor']
  puts 'Successfully found some_actor in the registry.'
end

Timers

Actors can create timers to perform some work in the future. Both one-shot and periodic timers are provided.

class MyActor < Tribe::Actor
  private
  def initialize(options = {})
    super

    timer(1, :timer, Time.now)
    periodic_timer(1, :periodic_timer, Time.now)
  end

  def on_timer(event)
    puts "MyActor (#{identifier}) ONE-SHOT: #{event.data}"
  end

  def on_periodic_timer(event)
    puts "MyActor (#{identifier}) PERIODIC: #{event.data}"
  end
end

# Create some named actors.
10.times do |i|
  MyActor.new(:name => "my_actor_#{i}")
end

# Sleep in order to observe the timers.
sleep 10

# Shutdown the actors.
10.times do |i|
  actor = Tribe.registry["my_actor_#{i}"]
  actor.enqueue(:shutdown)
end

Futures

As mentioned above, message passing with the "enqueue" method is asynchronous and always returns nil. This can be a pain since in many cases you will be interested in the result.

The "enqueue_future" method helps solve this problem by returning a a Tribe::Future object instead of nil. You can then use this object to obtain the result when it becomes available sometime in the future.

Tribe includes both blocking and non-blocking futures. You should prefer to use non-blocking futures for performance reasons (see details below).

In situations where an actor dies, your future will receive the raised exception as the result.

Non-blocking

Non-blocking futures are asynchronous and use callbacks. No waiting for a result is involved. The actor will continue to process other events.

class ActorA < Tribe::Actor
private
  def exception_handler(e)
    super
    puts concat_e("ActorA (#{identifier}) died.", e)
  end

  def on_start(event)
    friend = registry['actor_b']

    future = friend.enqueue_future(:compute, 10)

    future.success do |result|
      perform do
        puts "ActorA (#{identifier}) future result: #{result}"
      end
    end

    future.failure do |exception|
      perform do
        puts "ActorA (#{identifier}) future failure: #{exception}"
      end
    end
  end
end

class ActorB < Tribe::Actor
  def exception_handler(e)
    super
    puts concat_e("ActorB (#{identifier}) died.", e)
  end

  def on_compute(event)
    return factorial(event.data)
  end

  def factorial(num)
    return 1 if num <= 0
    return num * factorial(num - 1)
  end
end

actor_a = ActorA.new(:name => 'actor_a')
actor_b = ActorB.new(:name => 'actor_b')

actor_a.enqueue(:start)

actor_a.enqueue(:shutdown)
actor_b.enqueue(:shutdown)

Important: You must use Actor#perform inside the above callbacks. This ensures that your code executes within the context of the correct actor. Failure to do so will result in many nasty things.

Blocking

Blocking futures are synchronous. The actor won't process any other events until the future has a result.

class ActorA < Tribe::Actor
private
  def exception_handler(e)
    super
    puts concat_e("ActorA (#{identifier}) died.", e)
  end

  def on_start(event)
    friend = registry['actor_b']

    future = friend.enqueue_future(:compute, 10)

    future.wait # The current thread will sleep until a result is available.

    if future.success?
      puts "ActorA (#{identifier}) future result: #{future.result}"
    else
      puts "ActorA (#{identifier}) future failure: #{future.result}"
    end
  end
end

class ActorB < Tribe::Actor
  def exception_handler(e)
    super
    puts concat_e("ActorB (#{identifier}) died.", e)
  end

  def on_compute(event)
    return factorial(event.data)
  end

  def factorial(num)
    return 1 if num <= 0
    return num * factorial(num - 1)
  end
end

actor_a = ActorA.new(:name => 'actor_a')
actor_b = ActorB.new(:name => 'actor_b')

actor_a.enqueue(:start)

actor_a.enqueue(:shutdown)
actor_b.enqueue(:shutdown)

Futures and Performance

Futures have overhead associated with them. You should avoid them unless you are actaully interested in the result.

You should also prefer non-blocking futures over blocking ones. This is because a blocking future causes the current actor (and thread) to sleep.

Tribe is designed specifically to support a large number of actors running on a small number of threads. Thus, you will run into performance and/or deadlock problems if too many actors are waiting at the same time.

If you choose to use blocking futures then it is highly recommended that you only use them with dedicated actors. Each dedicated actor runs in a separate thread (instead of a shared thread pool). The downside to using dedicated actors is that they consume more resources and you can't have as many of them.

TODO - missing features

  • Supervisors.
  • Linking.
  • Future timeouts.
  • Clustering.

Contributing

  1. Fork it
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Commit your changes (git commit -am 'Added some feature')
  4. Push to the branch (git push origin my-new-feature)
  5. Create new Pull Request