Module: Skynet::Partitioner
- Defined in:
- lib/skynet/skynet_job.rb
Overview
Collection of partitioning utilities
Class Method Summary collapse
-
.args_pp(*args) ⇒ Object
Split one block of data into partitions.
-
.array_data_split_by_first_entry ⇒ Object
Smarter partitioner for array data, generates simple sum of array and ensures that all arrays sharing that key go into the same partition.
- .debug(msg, *args) ⇒ Object
-
.recombine_and_split ⇒ Object
Tries to be smart about what kind of data its getting, whether array of arrays or array of arrays of arrays.
- .simple_partition_data(data, partitions) ⇒ Object
Class Method Details
.args_pp(*args) ⇒ Object
Split one block of data into partitions
514 515 516 |
# File 'lib/skynet/skynet_job.rb', line 514 def self.args_pp(*args) "#{args.length > 0 ? args.pretty_print_inspect : ''}" end |
.array_data_split_by_first_entry ⇒ Object
Smarter partitioner for array data, generates simple sum of array and ensures that all arrays sharing that key go into the same partition.
583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 |
# File 'lib/skynet/skynet_job.rb', line 583 def self.array_data_split_by_first_entry lambda do |partitioned_data, new_partitions| partitions = Array.new (0..new_partitions - 1).each { |i| partitions[i] = Array.new } partitioned_data.each do |partition| partition.each do |array| next unless array.class == Array and array.size == 2 if array[0].kind_of?(Fixnum) key = array[0] else key = 0 array[0].each_byte { |c| key += c } end partitions[key % new_partitions] << array end end partitions end end |
.debug(msg, *args) ⇒ Object
518 519 520 521 |
# File 'lib/skynet/skynet_job.rb', line 518 def self.debug(msg,*args) log = Skynet::Logger.get log.debug "#{self.class} PARTITION: #{msg} #{args_pp(*args)}" end |
.recombine_and_split ⇒ Object
Tries to be smart about what kind of data its getting, whether array of arrays or array of arrays of arrays.
562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 |
# File 'lib/skynet/skynet_job.rb', line 562 def self.recombine_and_split lambda do |post_map_data, new_partitions| return post_map_data unless post_map_data.is_a?(Array) and (not post_map_data.empty?) and post_map_data.first.is_a?(Array) and (not post_map_data.first.empty?) if not post_map_data.first.first.is_a?(Array) partitioned_data = post_map_data.flatten else partitioned_data = post_map_data.inject(Array.new) do |data,part| data += part end end partitioned_data = Partitioner::simple_partition_data(partitioned_data, new_partitions) debug "POST PARTITIONED DATA", partitioned_data partitioned_data end end |
.simple_partition_data(data, partitions) ⇒ Object
523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 |
# File 'lib/skynet/skynet_job.rb', line 523 def self.simple_partition_data(data, partitions) partitioned_data = Array.new # If data size is significantly greater than the number of desired # partitions, we can divide the data roughly but the last partition # may be smaller than the others. # return data if (not data) or data.empty? if partitions >= data.length data.each do |datum| partitioned_data << [datum] end elsif (data.length >= partitions * 2) # Use quicker but less "fair" method size = data.length / partitions if (data.length % partitions != 0) size += 1 # Last slice of leftovers end (0..partitions - 1).each do |i| partitioned_data[i] = data[i * size, size] end else # Slower method, but partitions evenly partitions = (data.size < partitions ? data.size : partitions) (0..partitions - 1).each { |i| partitioned_data[i] = Array.new } data.each_with_index do |datum, i| partitioned_data[i % partitions] << datum end end partitioned_data end |