Module: Skynet::Partitioner

Defined in:
lib/skynet/skynet_job.rb

Overview

Collection of partitioning utilities

Class Method Summary collapse

Class Method Details

.args_pp(*args) ⇒ Object

Split one block of data into partitions



514
515
516
# File 'lib/skynet/skynet_job.rb', line 514

def self.args_pp(*args)
  "#{args.length > 0 ? args.pretty_print_inspect : ''}"
end

.array_data_split_by_first_entryObject

Smarter partitioner for array data, generates simple sum of array and ensures that all arrays sharing that key go into the same partition.



583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
# File 'lib/skynet/skynet_job.rb', line 583

def self.array_data_split_by_first_entry
  lambda do |partitioned_data, new_partitions|
    partitions = Array.new
    (0..new_partitions - 1).each { |i| partitions[i] = Array.new }

    partitioned_data.each do |partition|
      partition.each do |array|
        next unless array.class == Array and array.size == 2
        if array[0].kind_of?(Fixnum)
          key = array[0]
        else
          key = 0
          array[0].each_byte { |c| key += c }
        end
        partitions[key % new_partitions] << array
      end
    end

    partitions
  end
end

.debug(msg, *args) ⇒ Object



518
519
520
521
# File 'lib/skynet/skynet_job.rb', line 518

def self.debug(msg,*args)
  log = Skynet::Logger.get
  log.debug "#{self.class} PARTITION: #{msg} #{args_pp(*args)}"
end

.recombine_and_splitObject

Tries to be smart about what kind of data its getting, whether array of arrays or array of arrays of arrays.



562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
# File 'lib/skynet/skynet_job.rb', line 562

def self.recombine_and_split
  lambda do |post_map_data, new_partitions|

    return post_map_data unless post_map_data.is_a?(Array) and (not post_map_data.empty?) and post_map_data.first.is_a?(Array) and (not post_map_data.first.empty?)
    if not post_map_data.first.first.is_a?(Array)
      partitioned_data = post_map_data.flatten
    else
      partitioned_data = post_map_data.inject(Array.new) do |data,part| 
        data += part
      end    
    end    
    partitioned_data = Partitioner::simple_partition_data(partitioned_data, new_partitions)
    debug "POST PARTITIONED DATA", partitioned_data
    partitioned_data
  end
end

.simple_partition_data(data, partitions) ⇒ Object



523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
# File 'lib/skynet/skynet_job.rb', line 523

def self.simple_partition_data(data, partitions)    
  partitioned_data = Array.new

  # If data size is significantly greater than the number of desired
  # partitions, we can divide the data roughly but the last partition
  # may be smaller than the others.
  #      
  return data if (not data) or data.empty?
  
  if partitions >= data.length
    data.each do |datum|
     partitioned_data << [datum]
    end
  elsif (data.length >= partitions * 2)
    # Use quicker but less "fair" method
    size = data.length / partitions

    if (data.length % partitions != 0)
      size += 1 # Last slice of leftovers
    end

    (0..partitions - 1).each do |i|
      partitioned_data[i] = data[i * size, size]
    end
  else
    # Slower method, but partitions evenly
    partitions = (data.size < partitions ? data.size : partitions)
    (0..partitions - 1).each { |i| partitioned_data[i] = Array.new }

    data.each_with_index do |datum, i|
      partitioned_data[i % partitions] << datum
    end
  end

  partitioned_data
end