Welcome to Starfish

Starfish is a utility to make distributed programming ridiculously easy.

Download

Background

Starfish was born by abstracting code from a highly popular website (mog.com). I had written distributed programming code for various tasks including database corruption cleanup as well as real-time indexing requirements and decided that nobody should have to write as much code as I did to do those tasks. The general idea of distributed programming is vastly underused by most programmers and Starfish hides everything you don’t have to worry about.

Usage

Make a file and define the server and client. What follows is a simple example:

# foo.rb
class Foo
  attr_reader :i

  def initialize
    @i = 0
  end

  def inc
    logger.info "YAY it incremented by 1 up to #{@i}"
    @i += 1
  end
end

server do |object|
  object = Foo.new
end

client do |object|
  object.inc
end

Notice that you need not specify any network code, nor learn a foreign API like DRb or Rinda. You need not even define logger, starfish takes care of it for you at runtime. You just tell it how to serve an object and how to process the object. To run this code, simply run:

starfish foo.rb

This starts a server AND a client to start processing that object. The client runs in a continuous loop. If you call starfish foo.rb subsequent times from either the local machine or any machine on the local network, more clients will start processing the server concurrently.

By default, the server will start a log file at /tmp/foo.rb.log. You can override this behaviour.

server :log => "/var/log/foo.log" do |object|
  object = Foo.new
end

The :log parameter is intelligent and can take various other formats including nil for no logging or instances of any kind of logger class you prefer to use.

MapReduce with ActiveRecord

The crown jewel of Starfish is actually a library built on Starfish called MapReduce. Inspired by Google’s MapReduce (en.wikipedia.org/wiki/MapReduce), it automates the process of divide and conquer for large data sets. Currently, Starfish’s implementation only works with the ActiveRecord ORM from Ruby on Rails. The idea is that you have a very large data set that would be unmanageable to process otherwise. For example, let’s say your database table has 30GB of information in it. It would be very difficult to keep 30GB of records in RAM at one time; even if you processed them serially, it would take a very long time. The idea is to have many clients grabbing chunks of the database at a time making much faster work, even on a single processor machine, than would otherwise be feasible.

Here is the basic code that will get you up and running with MapReduce in Starfish.

# item.rb
ActiveRecord::Base.establish_connection(
  :adapter  => "mysql",
  :host     => "localhost",
  :username => "root",
  :password => "",
  :database => "some_database"
)

class Item < ActiveRecord::Base; end

server do |map_reduce|
  map_reduce.type = Item
end

client do |item|
  logger.info item.id
end

Just like the previous example, just run:

starfish item.rb

Starfish takes care of the rest. The code above does the following:

  • The server grabs all the items via: Item.find(:all)

  • Each of the clients grab an item from the collection

  • When there are no more items to be grabbed, everything shuts down

However there are cases when you might want some fancier behaviour.

server do |map_reduce|
  map_reduce.type = Item
  map_reduce.conditions = ["some_important_flag = ?", 1]
  map_reduce.vigilant = true
  map_reduce.queue_size = 1000
end

Here are three new options: conditions, vigilant, and queue_size. Conditions simply limits what is grabbed via ActiveRecord. It is equivalent to saying: Item.find(:all, :conditions => [“some_important_flag = ?”, 1]). Vigilant simply keeps the clients alive and when Item.find(:all) runs out of items, it will vigilantly poll for newly created items periodically. Queue size specifies the number of items buffered in the server queue. In the case of the 30GB database table, you do not want to grab all 30GB at once, instead you grab 1000 at a time as needed by your clients.

Another options you have when you use the vigilant option is map_reduce.empty_queue_wait which specifies how long to wait between polls for new information.

There is yet a third main run-mode for MapReduce under ActiveRecord.

server do |map_reduce|
  map_reduce.type = Item
  map_reduce.rescan_when_complete = true
end

The events follow like this when the rescan_when_complete option is enabled:

  • The server grabs all the items via: Item.find(:all)

  • Each of the clients grab an item from the collection

  • When there are no more items to be grabbed, the queue is re-filled via Item.find(:all) and the process starts again

If you need to continuously check over data for integrity, this option will meet your demands.

Another bonus you get while using MapReduce is automatic statistics. Simply call:

starfish item.rb stats

And you will be returned a YAML hash of statistics.

--- 
time_spent_grabbing_objects: 0.090328
time_spent_processing_objects: 0.023431
time_began: 2006-08-16 09:30:32.926399 -07:00
num_queues_grabbed: 5
time_spent_grabbing_queues: 0.090328
num_objects_grabbed: 10

MapReduce with Files

You can also divide and conquer with files. The idea is that you have a large file and want to process line-by-line as quickly as possible. It is simple to accomplish this with Starfish.

server do |map_reduce|
  map_reduce.type = File
  map_reduce.input = "/tmp/big_log_file"
end

client do |line|
  if line =~ /some_regex/
    logger.info(line)
  end
end

Simply replacing the type and input lets you process your file in a distributed way. Like when using ActiveRecord, you also have options:

server do |map_reduce|
  map_reduce.type = File
  map_reduce.input = "/tmp/big_log_file"
  map_reduce.queue_size = 1000 # how many lines of the file to buffer at a time
  map_reduce.lines_per_client = 100 # how many lines each client will process at a time
  map_reduce.rescan_when_complete = true
end

For a file, rescan_when_complete goes back to the beginning of the file when you have finished processing it. You can also use the vigilant option to wait for new data to be added to the file.

Examples

See the examples/ directory.

Authors

This library is released under the terms of the BSD.