Class: MongoMapperParallel

Inherits:
Object
  • Object
show all
Defined in:
lib/mongo_mapper_parallel.rb

Defined Under Namespace

Classes: Key

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ MongoMapperParallel

Instantiates the parallel operation object with the right class, javascript function, and field

Parameters:

  • opts (Hash) (defaults to: {})

    the options to initialize the parallel script.

Options Hash (opts):

  • :class (Class)

    the Mongo collection’s Ruby Class to execute operations on.

  • :javascript (String)

    the Javascript function in String format

  • :args (Array, Hash)

    the arguments to pass to the Javascript function

  • :split (String, Symbol)

    the field to split the computation on – typically an indexed unique property of the resources in the collection.

  • :maxChunkSizeBytes (Fixnum)

    the size of the chunks to parallelize. Defaults to ‘32*1024*1024 = 33554432`.

  • :debug (Boolean)

    whether to show messages during the process.



119
120
121
122
123
124
125
126
127
128
# File 'lib/mongo_mapper_parallel.rb', line 119

def initialize(opts={})
	@command_class = opts[:class]
	@javascript    = opts[:javascript]
	@args          = opts[:args]
	@split         = opts[:split] # name, title, etc...
	@splitSize     = opts[:maxChunkSizeBytes] || 32*1024*1024
	@debug         = opts[:debug].nil? ? true : opts[:debug]
	get_split_keys()
	self
end

Instance Attribute Details

#argsObject

Returns the value of attribute args.



18
19
20
# File 'lib/mongo_mapper_parallel.rb', line 18

def args
  @args
end

#command_classObject

Returns the value of attribute command_class.



16
17
18
# File 'lib/mongo_mapper_parallel.rb', line 16

def command_class
  @command_class
end

#debugObject

Returns the value of attribute debug.



19
20
21
# File 'lib/mongo_mapper_parallel.rb', line 19

def debug
  @debug
end

#javascriptObject

Returns the value of attribute javascript.



17
18
19
# File 'lib/mongo_mapper_parallel.rb', line 17

def javascript
  @javascript
end

#split_keysObject (readonly)

runs distributed computation over a Mongo collection



15
16
17
# File 'lib/mongo_mapper_parallel.rb', line 15

def split_keys
  @split_keys
end

Instance Method Details

#advance(percentage) ⇒ MongoMapperParallel

In case of stalled progress you can skip ahead by a percentage and mark the keys as ‘completed`.

Parameters:

  • percentage (Float)

    how far along you want to advance, a value between 0.0 and 1.0

Returns:



145
146
147
148
149
150
151
152
153
# File 'lib/mongo_mapper_parallel.rb', line 145

def advance percentage
	if percentage.class != Float
		raise TypeError.new "Can only advance by a Float value."
	elsif percentage > 1.0 or percentage < 0.0
		raise RangeError.new "Can only advance by a Float between 0.0 and 1.0."
	end
	@split_keys[0..(@split_keys.length*percentage).to_i].each {|i| i.completed = true}
	self
end

#get_extreme_split_keysArray<MongoMapperParallel::Key>

Obtains the splitVectors keys by looking at the first and last element of the database if no splitVector is returned.

Returns:



103
104
105
106
# File 'lib/mongo_mapper_parallel.rb', line 103

def get_extreme_split_keys
	first_split_key = get_first_split_key
	@split_keys << MongoMapperParallel::Key.new(:position => 0, :compiler => self, :key => first_split_key, :future_key => nil, :debug => @debug)
end

#get_first_split_keyObject

Obtains the first key

Returns:

  • (Object)

    the first split key.



84
85
86
# File 'lib/mongo_mapper_parallel.rb', line 84

def get_first_split_key
	@command_class.count > 0 ? @command_class.where().order(@split.to_sym).fields(@split.to_sym).first.send(@split.to_sym) : nil
end

#get_split_keysArray<MongoMapperParallel::Key>

Obtains the splitVectors keys to find chunks to parallelize via the MongoDB ‘splitVector` command.

Returns:



91
92
93
94
95
96
97
98
# File 'lib/mongo_mapper_parallel.rb', line 91

def get_split_keys
	@split_keys, splits = [], @command_class.database.command({splitVector: "#{@command_class.database.name}.#{@command_class.collection.name}", keyPattern: {@split.to_sym => 1}, maxChunkSizeBytes: @splitSize })["splitKeys"]
	splits.unshift({@split.to_s => get_first_split_key})
	splits.each_with_index do |split_key,k|
		@split_keys << MongoMapperParallel::Key.new(:position => k, :compiler => self, :key => split_key[@split.to_s], :future_key => (splits[k+1] ? splits[k+1][@split.to_s] : nil),:debug => @debug)
	end
	if @split_keys.length == 0 and @command_class.count > 0 then get_extreme_split_keys end
end

#runObject

Starts the parallel processing using Parallel.



132
133
134
135
136
137
138
139
# File 'lib/mongo_mapper_parallel.rb', line 132

def run
	total = @split_keys.length
	Parallel.each_with_index(@split_keys) do |section,k|
		if !section.completed then section.compile end
		JRProgressBar.show(k,total) if @debug
	end
	puts "Success".green if @debug
end