Class: Pipeliner::Pipeline

Inherits:
Object
  • Object
show all
Defined in:
lib/pipeliner/Pipeline.rb

Instance Method Summary collapse

Constructor Details

#initialize(args = {}) ⇒ Pipeline

args

Hash containing initializations

:pool =>

ActionPool::Pool for the Pipeline to use

:filters =>

FilterManager for the Pipeline to use

Create a new Pipeline



13
14
15
16
17
18
19
20
21
22
23
# File 'lib/pipeliner/Pipeline.rb', line 13

def initialize(args={})
    @pool = args[:pool] ? args[:pool] : ActionPool::Pool.new
    @hooks = {}
    @lock = Mutex.new
    @filters = args[:filters] ? args[:filters] : FilterManager.new
    if(!args[:pool])
        Kernel.at_exit do
            close
        end
    end
end

Instance Method Details

#<<(object) ⇒ Object

object

Object to send down the pipeline

Send an object down the pipeline

Raises:

  • (StandardError)


146
147
148
149
150
151
152
153
# File 'lib/pipeliner/Pipeline.rb', line 146

def <<(object)
    raise StandardError.new('Pipeline is currently closed') if @pool.nil?
    object = @filters.apply_filters(object)
    if(object)
        object.freeze
        @pool.process{ flush(object) }
    end
end

#clearObject

Remove all hooks from the pipeline



140
141
142
# File 'lib/pipeliner/Pipeline.rb', line 140

def clear
    @lock.synchronize{ @hooks.clear }
end

#closeObject

Close the pipeline Note: This is important to call at the end of a script when not

providing an ActionPool thread pool to the pipeline.
This will ensure the thread pool is properly shutdown
and avoid the script hanging.


30
31
32
33
# File 'lib/pipeliner/Pipeline.rb', line 30

def close
    @pool.shutdown
    clear
end

#filtersObject

Returns current FilterManager



41
42
43
# File 'lib/pipeliner/Pipeline.rb', line 41

def filters
    @filters
end

#filters=(fm) ⇒ Object

fm

FilterManager

Set the FilterManager the Pipeline should use

Raises:

  • (ArgumentError)


47
48
49
50
# File 'lib/pipeliner/Pipeline.rb', line 47

def filters=(fm)
    raise ArgumentError.new('Expecting a FilterManager') unless fm.is_a?(FilterManager)
    @filters = fm
end

#hook(type, object = nil, method = nil, &block) ⇒ Object

type

Type of Objects to pass to object

object

Object to hook to pipeline

method

Method to call on object

block

Block to apply to object (called without object and method set) or conditional

Hooks an Object into the pipeline for objects of a given type. The block can serve two purposes here. First, we can hook a block to a type like so:

pipeline.hook(String){|s| puts s }

Or, we can use the block as a conditional for calling an object’s method:

pipeline.hook(String, obj, :method){|s| s == 'test' }

In the second example, this hook will only be called if the String type object matches the conditional in the block, meaning the string must be ‘test’



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/pipeliner/Pipeline.rb', line 63

def hook(type, object=nil, method=nil, &block)
    raise ArgumentError.new 'Type must be provided' if type.nil?
    if(block && (block.arity > 1 || block.arity == 0))
        raise ArgumentError.new('Block must accept a parameter')
    end
    if((object && method.nil?) || (object.nil? && method))
        raise ArgumentError.new('Object AND method must be provided')
    end
    if(!block_given? && object.nil? && method.nil?)
        raise ArgumentError.new('No object information or block provided for hook')
    end
    @lock.synchronize do
        const = Splib.find_const(type)
        type = const unless const.nil?
        @hooks[type] ||= {}
        if(block_given? && object.nil? && method.nil?)
            @hooks[type][:procs] ||= []
            @hooks[type][:procs] << block
        else
            name = object.class
            method = method.to_sym
            raise ArgumentError.new('Given object does not respond to given method') unless object.respond_to?(method)
            @hooks[type][name] ||= []
            @hooks[type][name] << {:object => object, :method => method, :req => !block_given? ? lambda{|x|true} : block}
        end
    end
    block_given? ? block : nil
end

#hooksObject

Return current hooks hash



135
136
137
# File 'lib/pipeliner/Pipeline.rb', line 135

def hooks
    @lock.synchronize{ @hooks.dup }
end

#openObject

Open the pipeline



36
37
38
# File 'lib/pipeliner/Pipeline.rb', line 36

def open
    @pool = ActionPool::Pool.new unless @pool
end

#unhook(object, type = nil, method = nil) ⇒ Object

object

Object or Proc to unhook from the pipeline

type

Type of Objects being received

method

method registered to call

Remove a hook from the pipeline. If the method and type are not specified, the given object will be removed from all hooks

Raises:

  • (ArgumentError)


97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/pipeliner/Pipeline.rb', line 97

def unhook(object, type=nil, method=nil)
    raise ArgumentError.new('Object method provided for a Proc') if object.is_a?(Proc) && method
    @lock.synchronize do
        case object
        when Proc
            if(type)
                @hooks[type][:procs].delete(object)
                @hooks[type].delete(:procs) if @hooks[type][:procs].empty?
            else
                @hooks.each_value do |h|
                    h[:procs].delete(object)
                    h.delete(:procs) if h[:procs].empty?
                end
            end
        else
            if(method.nil? && type.nil?)
                @hooks.each_value{|v|v.delete_if{|k,z| k == object.class}}
            else
                raise ArgumentError.new('Type must be provided') if type.nil?
                const = Splib.find_const(type)
                type = const unless const.nil?
                method = method.to_sym if method
                name = object.class
                raise NameError.new('Uninitialized hook type given') unless @hooks[type]
                raise StandardError.new('No hooks found for given object') unless @hooks[type][name]
                if(method)
                    @hooks[type][name].delete_if{|x|x[:method] == method}
                    @hooks[type].delete(name) if @hooks[type][name].empty?
                else
                    @hooks[type].delete(name)
                end
            end                    
        end
        @hooks.delete_if{|k,v|v.empty?}
    end
end