Pipeline Toolkit

by VisFleet

Command line tools for processing messages by constructing a pipeline of workers. AMQP and Unix pipes are used to construct the pipeline. Messages are simple Hashs (serialized as YAML) so they can hold any values and change throughout the processing.

Provides:

  • Processing acknowledgments, ensuring the a message is only disposed of once it has been successful processed

  • Performance. Messages are moved through the pipeline fast

  • Command line tools for:

    • Subscribing to messages from an AMQP queue

    • Pushing messages back onto an AMQP exchange

    • Monitoring performance (see msg_probe)

  • A base module (MessageCommand) to include into your own classes to quickly make workers.

Install

> gem sources -a http://gems.github.com
> sudo gem install visfleet-pipeline_toolkit

Dependancies

It is assumed that you have:

  • An AMQP msg server to pop and push messages to (e.g. www.rabbitmq.com/)

  • A *nix system, such as Linux or Mac OS X.

Usage

  1. Create your worker

class MyWorker
  include MessageCommand

  def process_message(msg)
    # do stuff here
    msg
  end
end

MyWorker.new.start
  1. Hook it up to the main pipe (i.e. the AMQP server)

> msg_subscribe.rb -q source | my_worker.rb | msg_push.rb -x dest

You can learn more about the command line tools and what options are available by using their help command.

> msg_subscriber --help
> msg_push --help
> msg_sink --help
> msg_probe --help