Class: Wukong::Processor
- Inherits:
-
Hanuman::Stage
- Object
- Hanuman::Stage
- Wukong::Processor
- Includes:
- Logging
- Defined in:
- lib/wukong/processor.rb,
lib/wukong/widget/echo.rb,
lib/wukong/widget/logger.rb,
lib/wukong/widget/extract.rb,
lib/wukong/widget/filters.rb,
lib/wukong/widget/operators.rb,
lib/wukong/widget/serializers.rb,
lib/wukong/widget/reducers/bin.rb,
lib/wukong/widget/reducers/sort.rb,
lib/wukong/widget/reducers/uniq.rb,
lib/wukong/widget/reducers/count.rb,
lib/wukong/widget/reducers/group.rb,
lib/wukong/widget/reducers/moments.rb,
lib/wukong/widget/reducers/improver.rb,
lib/wukong/widget/reducers/join_xml.rb,
lib/wukong/widget/reducers/accumulator.rb,
lib/wukong/widget/reducers/group_concat.rb
Overview
The Processor is the basic unit of computation in Wukong. A processor can be thought of as an arbitrary function that takes certain inputs and produces certain (or no) outputs.
A Processor can be written and tested purely in Ruby and on your local machine. You can glue processors together
Direct Known Subclasses
Accumulator, Echo, Extract, Filter, Flatten, Improver, JoinXML, Logger, Map, Serializer
Defined Under Namespace
Classes: Accumulator, Bin, Compact, Count, Echo, Extract, Filter, Flatten, FromCsv, FromDelimited, FromJson, FromTsv, Group, GroupConcat, Head, Identity, Improver, JoinXML, Limit, Logger, Map, Moments, NotRegexpFilter, Null, Recordize, RegexpFilter, Reject, Sample, Select, Serializer, Sort, Tail, ToCsv, ToDelimited, ToInspect, ToJson, ToTsv, Uniq
Constant Summary collapse
- SerializerError =
Class.new(Error)
Instance Attribute Summary
Attributes included from Hanuman::StageInstanceMethods
Class Method Summary collapse
Instance Method Summary collapse
-
#finalize {|record| ... } ⇒ Object
This method is called to signal the last record has been received but that further processing may still be done, events still be yielded, &c.
-
#perform_action(*args) {|record| ... } ⇒ Object
When instantiated with a block, the block will replace this method.
-
#process(record) {|record| ... } ⇒ Object
This method is called once per record.
-
#receive_action(action) ⇒ Object
:nodoc:.
-
#setup ⇒ Object
This method is called after the processor class has been instantiated but before any records are given to it to process.
-
#stop ⇒ Object
This method is called after all records have been passed.
Methods included from Logging
Methods inherited from Hanuman::Stage
Methods included from Hanuman::StageClassMethods
#builder, #label, #register, #set_builder
Methods included from Hanuman::StageInstanceMethods
#add_link, #linkable_name, #root
Class Method Details
.configure(settings) ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/wukong/processor.rb', line 26 def configure(settings) settings.description = description if description fields.each_pair do |name, field| next if field.doc == false || field.doc.to_s == 'false' next if [:log].include?(name) field_props = {}.tap do |props| props[:description] = field.doc unless field.doc == "#{name} field" field_type = (field.type.respond_to?(:product) ? field.type.product : field.type) configliere_type = case field_type when String then nil when TrueClass, FalseClass then :boolean else field_type end props[:type] = configliere_type if configliere_type props[:default] = field.default if field.default end existing_value = settings[name] settings.define(name, field_props) settings[name] = existing_value unless existing_value.nil? end end |
.description(desc = nil) ⇒ Object
21 22 23 24 |
# File 'lib/wukong/processor.rb', line 21 def description desc=nil @description = desc if desc @description end |
Instance Method Details
#finalize {|record| ... } ⇒ Object
This method is called to signal the last record has been received but that further processing may still be done, events still be yielded, &c.
This can be used within an aggregating processor (like a reducer in a map/reduce job) to start processing the final aggregate of records since the "last record" has already been received.
Override this method in your subclass
99 100 |
# File 'lib/wukong/processor.rb', line 99 def finalize end |
#perform_action(*args) {|record| ... } ⇒ Object
When instantiated with a block, the block will replace this method.
57 58 |
# File 'lib/wukong/processor.rb', line 57 def perform_action(*args) end |
#process(record) {|record| ... } ⇒ Object
This method is called once per record.
Override this method in your subclass.
83 84 85 |
# File 'lib/wukong/processor.rb', line 83 def process(record, &emit) yield record end |
#receive_action(action) ⇒ Object
:nodoc:
The action attribute is turned into the perform action method.
65 66 67 |
# File 'lib/wukong/processor.rb', line 65 def receive_action(action) self.define_singleton_method(:perform_action, &action) end |
#setup ⇒ Object
This method is called after the processor class has been instantiated but before any records are given to it to process.
Override this method in your subclass.
73 74 |
# File 'lib/wukong/processor.rb', line 73 def setup end |
#stop ⇒ Object
This method is called after all records have been passed. It signals that processing should stop.
Override this method in your subclass.
106 107 |
# File 'lib/wukong/processor.rb', line 106 def stop end |