Class: RFlow::Components::RubyProcFilter

Inherits:
RFlow::Component show all
Defined in:
lib/rflow/components/ruby_proc_filter.rb

Overview

Component that filters messages based on Ruby defined in the RFlow config file. Inbound messages will be sent out #filtered if the predicate returns truthy, #dropped if it returns falsey, or #errored if it raises an exception.

Accept config parameter filter_proc_string which is the text of a lambda receiving a message message. For example, message.data.data_object > 2.

Instance Attribute Summary collapse

Attributes inherited from RFlow::Component

#name, #ports, #shard, #uuid, #worker

Instance Method Summary collapse

Methods inherited from RFlow::Component

build, #cleanup!, #initialize, input_port, #input_ports, output_port, #output_ports, #run!, #shutdown!, #to_s

Constructor Details

This class inherits a constructor from RFlow::Component

Instance Attribute Details

#droppedComponent::OutputPort (readonly)

Outputs Messages that do not pass the filter predicate.



22
# File 'lib/rflow/components/ruby_proc_filter.rb', line 22

output_port :dropped

#erroredComponent::OutputPort (readonly)

Outputs Messages that raise from the filter predicate.



26
# File 'lib/rflow/components/ruby_proc_filter.rb', line 26

output_port :errored

#filteredComponent::OutputPort (readonly)

Outputs Messages that pass the filter predicate.



18
# File 'lib/rflow/components/ruby_proc_filter.rb', line 18

output_port :filtered

#inComponent::InputPort (readonly)

Receives Messages.



14
# File 'lib/rflow/components/ruby_proc_filter.rb', line 14

input_port :in

Instance Method Details

#configure!(config) ⇒ void

This method returns an undefined value.

RFlow-called method at startup.

Parameters:

  • config (Hash)

    configuration from the RFlow config file



31
32
33
# File 'lib/rflow/components/ruby_proc_filter.rb', line 31

def configure!(config)
  @filter_proc = eval("lambda {|message| #{config['filter_proc_string']} }")
end

#process_message(input_port, input_port_key, connection, message) ⇒ void

This method returns an undefined value.

RFlow-called method on message arrival.



37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/rflow/components/ruby_proc_filter.rb', line 37

def process_message(input_port, input_port_key, connection, message)
  begin
    if @filter_proc.call(message)
      filtered.send_message message
    else
      dropped.send_message message
    end
  rescue Exception => e
    RFlow.logger.debug "#{self.class} Message caused exception: #{e.class}: #{e.message}: #{e.backtrace}"
    errored.send_message message
  end
end