Class: Wukong::Processor::Sort

Inherits:
Accumulator show all
Includes:
DynamicGet
Defined in:
lib/wukong/widget/reducers/sort.rb

Overview

Sorts input records.

For many use cases you're better off using native tools like /bin/sort because they are faster and already do what you need.

Other times, you need something that can introspect more on its input:

The sort widget is useful for modeling Hadoop jobs, but don't forget that Hadoop does its own sorting, so the sort widget doesn't belong in your map/reduce jobs.

Examples:

When /bin/sort is more than enough on the command-line


$ cat input
1 apple
2 banana
3 cat
4 banana
...
$ cat input | sort -k2
1 apple
2 banana
4 banana
3 cat
...

When you may prefer the sort widget on the command-line


$ cat input
{"id": 1, "word": "apple" }
{"id": 2, "word": "cat"   }
{"id": 3, "word": "banana"}
...
$ cat input | wu-local sort --on word
{"id": 1, "word": "apple" }
{"id": 3, "word": "banana"}    
{"id": 2, "word": "cat"   }
...

The wrong way to model a Hadoop map/reduce job


Wukong.dataflow(:my_incorrect_job_dataflow) do
  parse | extract(part: 'country') | sort | group
end

The right way to model a Hadoop map/reduce job


Wukong.dataflow(:mapper) do
  parse | extract(part: 'country')
end

Wukong.dataflow(:reducer) do
  group
end

Constant Summary

Constants inherited from Wukong::Processor

SerializerError

Instance Attribute Summary

Attributes inherited from Accumulator

#group, #key

Attributes included from Hanuman::StageInstanceMethods

#graph

Instance Method Summary collapse

Methods included from DynamicGet

#get, #get_nested, included

Methods inherited from Accumulator

#process, #start

Methods inherited from Wukong::Processor

configure, description, #perform_action, #process, #receive_action, #stop

Methods included from Logging

included

Methods inherited from Hanuman::Stage

#clone

Methods included from Hanuman::StageClassMethods

#builder, #label, #register, #set_builder

Methods included from Hanuman::StageInstanceMethods

#add_stage_link, #linkable_name, #root

Instance Method Details

#accumulate(record) ⇒ Object

Stores the record for later sorting.

Parameters:

  • record (Object)


138
139
140
# File 'lib/wukong/widget/reducers/sort.rb', line 138

def accumulate record
  @records << record
end

#compare(x, y) ⇒ 1, ...

Compare records x and y using their sortable parts.

Will use numeric sorting when asked.

Parameters:

  • x (Object)
  • y (Object)

Returns:

  • (1, 0, -1)

    depends on which of x or y is considered greater



168
169
170
171
172
173
174
175
# File 'lib/wukong/widget/reducers/sort.rb', line 168

def compare(x, y)
  a = (sortable(x) or return -1) 
  b = (sortable(y) or return  1)
  if numeric
    a = a.to_f ; b = b.to_f
  end
  a <=> b
end

#finalize {|record| ... } ⇒ Object

Sorts all the stored records and yields in one sorted according to the field in the right order.

Yields:

  • (record)

    each record in correct sort order



147
148
149
150
151
# File 'lib/wukong/widget/reducers/sort.rb', line 147

def finalize
  sorted = @records.sort{ |x, y| compare(x, y) }
  sorted.reverse! if reverse
  sorted.each{ |record| yield record }
end

#get_key(record) ⇒ :__first__group__

Keeps all the records in a single group so they can be sorted.

Parameters:

  • record (Object)

Returns:

  • (:__first__group__)


131
132
133
# File 'lib/wukong/widget/reducers/sort.rb', line 131

def get_key(record)
  :__first_group__
end

#setupObject

Intializes the array of records that will hold all the values.



122
123
124
125
# File 'lib/wukong/widget/reducers/sort.rb', line 122

def setup
  super()
  @records = []
end

#sortable(record) ⇒ Object

Extracts the sortable part of the input record.

Parameters:

  • record (Object)

Returns:

  • (Object)

    the part of the record to sort on



157
158
159
# File 'lib/wukong/widget/reducers/sort.rb', line 157

def sortable(record)
  get(self.on, record)
end