Class: Wukong::Processor
- Inherits:
-
Hanuman::Stage
- Object
- Hanuman::Stage
- Wukong::Processor
- Includes:
- Vayacondios::Notifications, Logging
- Defined in:
- lib/wukong/processor.rb,
lib/wukong/widget/filters.rb,
lib/wukong/widget/processors.rb,
lib/wukong/widget/serializers.rb,
lib/wukong/widget/reducers/bin.rb,
lib/wukong/widget/reducers/sort.rb,
lib/wukong/widget/reducers/uniq.rb,
lib/wukong/widget/reducers/count.rb,
lib/wukong/widget/reducers/group.rb,
lib/wukong/widget/reducers/moments.rb,
lib/wukong/widget/reducers/accumulator.rb,
lib/wukong/widget/reducers/group_concat.rb
Overview
The Processor is the basic unit of computation in Wukong. A processor can be thought of as an arbitrary function that takes certain inputs and produces certain (or no) outputs.
A Processor can be written and tested purely in Ruby and on your local machine. You can glue processors together
Direct Known Subclasses
Accumulator, Extract, Filter, Flatten, Logger, Map, Serializer, Topic
Defined Under Namespace
Modules: BufferedProcessor, StdoutProcessor Classes: Accumulator, Bin, Count, Extract, Filter, Flatten, FromCsv, FromDelimited, FromJson, FromTsv, Group, GroupConcat, Head, Identity, Limit, Logger, Map, Moments, NotRegexpFilter, Null, Recordize, RegexpFilter, Reject, Sample, Select, Serializer, Sort, Tail, ToCsv, ToDelimited, ToInspect, ToJson, ToTsv, Topic, Uniq
Constant Summary collapse
- SerializerError =
Class.new(Error)
Class Method Summary collapse
- .configure(settings) ⇒ Object
- .consumes(*args) ⇒ Object
- .description(desc = nil) ⇒ Object
- .produces(*args) ⇒ Object
- .valid_serializer?(label) ⇒ Boolean
- .validate_and_set_serialization(direction, label) ⇒ Object
Instance Method Summary collapse
- #expected_record_type(type) ⇒ Object
- #expected_serialization(direction) ⇒ Object
-
#finalize {|record| ... } ⇒ Object
This method is called to signal the last record has been received but that further processing may still be done, events still be yielded, &c.
-
#perform_action(*args) {|record| ... } ⇒ Object
When instantiated with a block, the block will replace this method.
-
#process(record) {|record| ... } ⇒ Object
This method is called once per record.
-
#receive_action(action) ⇒ Object
:nodoc:.
-
#setup ⇒ Object
This method is called after the processor class has been instantiated but before any records are given to it to process.
-
#stop ⇒ Object
This method is called after all records have been passed.
Methods included from Logging
Methods included from Hanuman::StageClassMethods
#builder, #label, #register, #set_builder
Class Method Details
.configure(settings) ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/wukong/processor.rb', line 47 def configure(settings) settings.description = description if description fields.each_pair do |name, field| next if field.doc == false || field.doc.to_s == 'false' next if [:log, :notifier].include?(name) field_props = {}.tap do |props| props[:description] = field.doc unless field.doc == "#{name} field" field_type = (field.type.respond_to?(:product) ? field.type.product : field.type) configliere_type = case field_type when String then nil when TrueClass, FalseClass then :boolean else field_type end props[:type] = configliere_type if configliere_type props[:default] = field.default if field.default end existing_value = settings[name] settings.define(name, field_props) settings[name] = existing_value unless existing_value.nil? end end |
.consumes(*args) ⇒ Object
27 28 29 30 31 |
# File 'lib/wukong/processor.rb', line 27 def consumes(*args) = args. @consumes = [:as] validate_and_set_serialization(:from, args.first) end |
.description(desc = nil) ⇒ Object
22 23 24 25 |
# File 'lib/wukong/processor.rb', line 22 def description desc=nil @description = desc if desc @description end |
.produces(*args) ⇒ Object
33 34 35 36 37 |
# File 'lib/wukong/processor.rb', line 33 def produces(*args) = args. @produces = [:as] validate_and_set_serialization(:to, args.first) end |
.valid_serializer?(label) ⇒ Boolean
39 40 41 |
# File 'lib/wukong/processor.rb', line 39 def valid_serializer? label label end |
.validate_and_set_serialization(direction, label) ⇒ Object
43 44 45 |
# File 'lib/wukong/processor.rb', line 43 def validate_and_set_serialization(direction, label) instance_variable_set("@serialization_#{direction}", label) if %w[ tsv json xml ].include?(label.to_s) end |
Instance Method Details
#expected_record_type(type) ⇒ Object
72 73 74 |
# File 'lib/wukong/processor.rb', line 72 def expected_record_type(type) self.class.instance_variable_get("@#{type}") end |
#expected_serialization(direction) ⇒ Object
76 77 78 |
# File 'lib/wukong/processor.rb', line 76 def expected_serialization(direction) self.class.instance_variable_get("@serialization_#{direction.to_s}") end |
#finalize {|record| ... } ⇒ Object
This method is called to signal the last record has been received but that further processing may still be done, events still be yielded, &c.
This can be used within an aggregating processor (like a reducer in a map/reduce job) to start processing the final aggregate of records since the "last record" has already been received.
Override this method in your subclass
128 129 |
# File 'lib/wukong/processor.rb', line 128 def finalize end |
#perform_action(*args) {|record| ... } ⇒ Object
When instantiated with a block, the block will replace this method.
86 87 |
# File 'lib/wukong/processor.rb', line 86 def perform_action(*args) end |
#process(record) {|record| ... } ⇒ Object
This method is called once per record.
Override this method in your subclass.
112 113 114 |
# File 'lib/wukong/processor.rb', line 112 def process(record, &emit) yield record end |
#receive_action(action) ⇒ Object
:nodoc:
The action attribute is turned into the perform action method.
94 95 96 |
# File 'lib/wukong/processor.rb', line 94 def receive_action(action) self.define_singleton_method(:perform_action, &action) end |
#setup ⇒ Object
This method is called after the processor class has been instantiated but before any records are given to it to process.
Override this method in your subclass.
102 103 |
# File 'lib/wukong/processor.rb', line 102 def setup end |
#stop ⇒ Object
This method is called after all records have been passed. It signals that processing should stop.
Override this method in your subclass.
135 136 |
# File 'lib/wukong/processor.rb', line 135 def stop end |