Pipeliner

The pipeliner library provides framework for message passing between objects. It allows objects to hook into the pipeline to receive certain types of objects, and process them in any way they see fit. Processing happens using a thread pool, letting message processing happen in an asynchronous fashion.

install (easy):

gem install pipeliner

install (less easy):

git clone http://github.com/spox/pipeliner.git
cd pipeliner && gem build *.gemspec && gem install ./

install (less easy that’s a little easier)

rip makes it easy to install directly from a github repository.

Gritty Details

The pipeliner library allows for some fun things, and here is where I’ll try and explain some of them. First, lets take a look at a simple example of this library in action:

require 'pipeliner'

output = Array.new
pipeline = Pipeliner::Pipeline.new
pipeline.hook(String){|s| output << "#{s}: modified"}
pipeline << "string"
pipeline << 100
p output

=> ["string: modified"]

In the example above, we use a simple block to process any String type objects. Anything else passed placed in the pipeline is ignored by the block. We can also use objects instead of blocks:

require 'pipeliner'

class Tester
    attr_reader :store
    def initialize
        @store = []
    end
    def process(m)
        @store << m
    end
end

pipeline = Pipeliner::Pipeline.new
obj = Tester.new
pipeline.hook(Object, obj, :process)
pipeline << "String"
pipeline << 100
pipeline << {:test => :data}
p obj.store

=> ["String", 100, {:test=>:data}]

Another handy thing is limiting what hooks are called when a very generic type is hooked. When supplying an object method to call, we can also pass a block that provides the condition for which the hook will be run:

require 'pipeliner'

class Tester
    attr_reader :store
    def initialize
        @store = []
    end
    def process(m)
        @store << m
    end
end

pipeline = Pipeliner::Pipeline.new
obj = Tester.new
pipeline.hook(Object, obj, :process){|o| o.is_a?(String) }
pipeline << "String"
pipeline << 100
pipeline << {:test => :data}
p obj.store

=> ["String"]

But, what if you want anything added to the pipeline to be modified before it is distributed out to hooked processors? Well, then you would use a filter:

require 'pipeliner'

output = []
pipeline = Pipeliner::Pipeline.new
pipeline.filters.add(String){|s| "#{s} -> filtered"}
pipeline.hook(Object){|s| output << s}
pipeline << "string"
p output

=> ["string -> filtered"]

Filters allow objects to be modified before reaching any hooked processors, and can even remove an unwanted object from the pipeline. Here is an example that removes all Symbols:

require 'pipeliner'

output = []
pipeline = Pipeliner::Pipeline.new
pipeline.filters.add(Symbol){|s| nil}
pipeline.hook(Object){|s| output << s}
pipeline << "string"
pipeline << 100
pipeline << :test
p output

=> ["string", 100]

Filters can also be created using the Filter object:

require 'pipeliner'

class MyFilter < Pipeliner::Filter
    def do_filter(m)
        "#{m} - filtered"
    end
end
output = []
pipeline = Pipeliner::Pipeline.new
filter = MyFilter.new(String)
pipeline.filters.add(String, filter)
pipeline.hook(Object){|s|output << s}
pipeline << "string"
pipeline << 100
p output

=> ["string - filtered", 100]

Last remarks

If you find any bugs, please report them through github. If you are in need of any help, you can generally find me on DALnet and Freenode.

License

Pipeliner is licensed under the MIT License
Copyright (c) 2009 spox <[email protected]>