Class: Skynet::Job
- Inherits:
-
Object
- Object
- Skynet::Job
- Includes:
- GuidGenerator, SkynetDebugger
- Defined in:
- lib/skynet/skynet_job.rb,
lib/skynet/skynet_tuplespace_server.rb
Overview
Skynet::Job is the main interface to Skynet. You create a job object giving it the starting data (map_data), along with what class has the map/reduce functions in it. Even though Skynet is distributed, when you call #run on a plain Skynet::Job, it will still block in your current process until it has completed your task. If you want to go on to do other things you’ll want to pass :async => true when creating a new job. Then later call job.results to retrieve your results.
There are also many global configuration options which can be controlled through Skynet::CONFIG
Example Usage: Create a file called mapreduce_test.rb with the following.
class MapreduceTest
include SkynetDebugger ## This gives you logging methods such as log, error, info, fatal
def self.run
job = Skynet::Job.new(
:mappers => 2,
:reducers => 1,
:map_reduce_class => self,
:map_data => [OpenStruct.new({:created_by => 2}),OpenStruct.new({:created_by => 2}),OpenStruct.new({:created_by => 3})]
)
results = job.run
end
def self.map(profiles)
result = Array.new
profiles.each do |profile|
result << [profile.created_by, 1] if profile.created_by
end
result
end
def self.reduce(pairs)
totals = Hash.new
pairs.each do |pair|
created_by, count = pair[0], pair[1]
totals[created_by] ||= 0
totals[created_by] += count
end
totals
end
end
You need to make sure Skynet is running with your class loaded. That’s is how Skynet works. Since there is no easy way to actually pass code around the network, each skynet worker must already have your code loaded. If you have skynet started, stop it and then start it with the -r flag to tell it where to find your class it should require.
$ skynet -r mapreduce_test.rb
Then go into the skynet console to test running your map reduce task.
$ skynet console -r mapreduce_test.rb
skynet>> MapreduceTest.run # returns {2=>2, 3=>1}
In the example above, you might notice that self.map and self.reduce both accept Arrays.
If you do not want to deal with getting arrays of map_data or reduce_data, you can include MapreduceHelper into your class and then implement self.map_each and self.reduce_each methods.
The included self.map and self.reduce methods will handle iterating over the map_data and reduce_data, passing each element to your map_each and reduce_each methods respectively. They will also handle error handling within that loop to make sure even if a single map or reduce fails, processing will continue.
If you do not want processing to continue if a map fails, do not use the MapreduceHelper mixin.
Since Skynet must have your code, you will probably want to install skynet into the application that skynet needs access to in order to run your jobs. See bin/skynet_install for more info.
See new for the many other options to control various Skynet::Job settings.
Direct Known Subclasses
Defined Under Namespace
Classes: BadMapOrReduceError, Error, LocalMessageQueue, WorkerError
Constant Summary collapse
- FIELDS =
[:queue_id, :mappers, :reducers, :silent, :name, :map_timeout, :map_data, :job_id, :reduce_timeout, :master_timeout, :map_name, :reduce_name, :master_result_timeout, :result_timeout, :start_after, :solo, :single, :version, :map, :map_partitioner, :reduce, :reduce_partition, :map_reduce_class, :master_retry, :map_retry, :reduce_retry, :keep_map_tasks, :keep_reduce_tasks, :local_master, :async, :data_debug ]
- @@worker_ver =
nil
Instance Attribute Summary collapse
-
#data_debug ⇒ Object
Returns the value of attribute data_debug.
-
#use_local_queue ⇒ Object
Returns the value of attribute use_local_queue.
Class Method Summary collapse
- .debug_class_desc ⇒ Object
- .mq ⇒ Object
-
.results_by_job_id(job_id, timeout = 2) ⇒ Object
Given a job_id, returns the results from the message queue.
Instance Method Summary collapse
-
#async? ⇒ Boolean
async is true if the async flag is set and the job is not a ‘single’ job, or in solo mode.
- #data_debug? ⇒ Boolean
- #display_info ⇒ Object
- #enqueue_messages(messages) ⇒ Object
- #gather_results(number_of_tasks, timeout = nil, description = nil) ⇒ Object
-
#initialize(options = {}) ⇒ Job
constructor
Most of the time you will merely call #new(options) and then #run on the returned object.
- #keep_map_tasks ⇒ Object
- #keep_reduce_tasks ⇒ Object
- #local_master? ⇒ Boolean
- #local_mq ⇒ Object
- #local_mq_reset! ⇒ Object
- #map=(map) ⇒ Object
- #map_data=(map_data) ⇒ Object
- #map_enqueue ⇒ Object
- #map_local? ⇒ Boolean
- #map_reduce_class=(klass) ⇒ Object
- #map_results(number_of_tasks) ⇒ Object
- #map_retry ⇒ Object
- #map_tasks ⇒ Object
- #master? ⇒ Boolean
- #master_enqueue ⇒ Object
- #master_results ⇒ Object
- #master_retry ⇒ Object
- #master_task ⇒ Object
- #mq ⇒ Object
- #partition_data(post_map_data) ⇒ Object
- #reduce=(reduce) ⇒ Object
- #reduce_enqueue(partitioned_data) ⇒ Object
- #reduce_local?(reduce_tasks) ⇒ Boolean
- #reduce_results(number_of_tasks) ⇒ Object
- #reduce_retry ⇒ Object
- #reduce_tasks(partitioned_data) ⇒ Object
- #reset! ⇒ Object
-
#results ⇒ Object
Returns the final results of this map/reduce job.
-
#run(options = {}) ⇒ Object
Options are:
:local_master
BOOL (DEFAULT true) By default, your Skynet::Job will act as the master for your map/reduce job, doling out tasks, waiting for other workers to complete and return their results and dealing with merging and partitioning the data. - #run_master ⇒ Object
- #single? ⇒ Boolean
- #solo? ⇒ Boolean
- #start_after=(time) ⇒ Object
- #task_id ⇒ Object
- #tasks_to_messages(tasks) ⇒ Object
- #to_h ⇒ Object
- #use_local_queue? ⇒ Boolean
- #version ⇒ Object
- #version=(v) ⇒ Object
Methods included from GuidGenerator
Methods included from SkynetDebugger
#args_pp, #debug, #debug_header, #error, #fatal, included, #info, #log, #printlog, #stderr, #stdout, #warn
Constructor Details
#initialize(options = {}) ⇒ Job
Most of the time you will merely call #new(options) and then #run on the returned object.
Options are: :local_master
BOOL (DEFAULT true)
By default, your Skynet::Job will act as the master for your map/reduce job, doling out
tasks, waiting for other workers to complete and return their results and dealing with
merging and partitioning the data. If you call #run in async mode, another worker will handle
being the master for your job without blocking. If you run :async => false, :local_master => false
Skynet will let another worker be the master for your job, but will block waiting for the
final results. The benefit of this is that if your process dies, the Job will continue to
run remotely.
:async
BOOL (DEFAULT false)
If you run in async mode, another worker will handle being the master for your job without blocking.
You can not pass :local_master => true, :async => true since the only way to allow your
job to run asyncronously is to have a remote_master.
:map_data
(Array or Enumerable)
map_data should be an Array or Enumerable that data Skynet::Job will split up
and distribute among your workers. You can stream data to Skynet::Job by passing
an Enumerable that implements next or each.
:map_reduce_class
(Class or Class Name)
Skynet::Job will look for class methods named self.map, self.reduce, self.map_partitioner,
self.reduce_partition in your map_reduce_class. The only method requires is self.map.
Each of these methods must accept an array. Examples above.
:map
(Class Name)
You can pass a classname, or a proc. If you pass a classname, Job will look for a method
called self.map in that class.
WARNING: Passing a proc does not work right now.
:reduce
(Class Name)
You can pass a classname, or a proc. If you pass a classname, Job will look for a method
called self.reduce in that class.
WARNING: Passing a proc does not work right now.
:reduce_partition
(Class Name)
You can pass a classname, or a proc. If you pass a classname, Job will look for a method
called self.reduce_partition in that class.
WARNING: Passing a proc does not work right now.
:mappers
Fixnum
The number of mappers to partition map data for.
:reducers
Fixnum
The number of reducers to partition the returned map_data for.
:master_retry
Fixnum
If the master fails for any reason, how many times should it be retried? You can also set
Skynet::CONFIG[:DEFAULT_MASTER_RETRY] (DEFAULT 0)
:map_retry
Fixnum
If a map task fails for any reason, how many times should it be retried? You can also set
Skynet::CONFIG[:DEFAULT_MAP_RETRY] (DEFAULT 3)
:reduce_retry
Fixnum
If a reduce task fails for any reason, how many times should it be retried? You can also set
Skynet::CONFIG[:DEFAULT_REDUCE_RETRY] (DEFAULT 3)
:master_timeout
, :map_timeout
, :reduce_timeout
, master_result_timeout
, result_timeout
These control how long skynet should wait for particular actions to be finished.
The master_timeout controls how long the master should wait for ALL map/reduce tasks ie. the entire job to finish.
The master_result_timeout controls how long the final result should wait in the queue before being expired.
The map and reduce timeouts control how long individual map and reduce tasks shoudl take.
:single
BOOL
By default the master task distributes the map and reduce tasks to other workers.
In single mode the master will take care of the map and reduce tasks by itself.
This is handy when you really want to just perform some single action asyncronously.
In this case you're merely using Skynet to postpone some action. In single mode, the
first worker that picks up your task will just complete it as opposed to trying to distribute
it to another worker.
:start_after
Time or Time.to_i
Do not start job until :start_after has passed
:queue
String
Which queue should this Job go in to? The queue provided is merely used to
determine the queue_id.
Queues are defined in Skynet::CONFIG[:MESSAGE_QUEUES]
:queue_id
Fixnum (DEFAULT 0)
Which queue should this Job go in to?
Queues are defined in Skynet::CONFIG[:MESSAGE_QUEUES]
:solo
BOOL
One normally turns solo mode in in Skynet::Config using Skynet::CONFIG[:SOLO] = true
In solo mode, Skynet jobs do not add items to a Skynet queue. Instead they do all
work in place. It's like a Skynet simulation mode. It will complete all tasks
without Skynet running. Great for testing. You can also wrap code blocks in
Skynet.solo {} to run that code in solo mode.
:version
Fixnum
If you do not provide a version the current worker version will be used.
Skynet workers start at a specific version and only look for jobs that match that version.
A worker will continue looking for jobs at that version until there are no more jobs left on
the queue for that version. At that time, the worker will check to see if there is a new version.
If there is, it will restart itself at the new version (assuming you had already pushed code to
said workers.)
To retrieve the current version, set the current version or increment the current version, see
Skynet::Job.set_worker_version, Skynet::Job.get_worker_version, Skynet::Job.increment_worker_version
:name
, :map_name
, :reduce_name
These name methods are merely for debugging while watching the Skynet logs or the Skynet queue.
If you do not supply names, it will try and provide sensible ones based on your class names.
:keep_map_tasks
BOOL or Fixnum (DEFAULT 1)
If true, the master will run the map_tasks locally.
If a number is provided, the master will run the map_tasks locally if there are
LESS THAN OR EQUAL TO the number provided.
You may also set Skynet::CONFIG[:DEFAILT_KEEP_MAP_TASKS] DEFAULT 1
:keep_reduce_tasks
BOOL or Fixnum (DEFAULT 1)
If true, the master will run the reduce_tasks locally.
If a number is provided, the master will run the reduce_tasks locally if there are
LESS THAN OR EQUAL TO the number provided.
You may also set Skynet::CONFIG[:DEFAILT_REDUCVE_MAP_TASKS] DEFAULT 1
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 |
# File 'lib/skynet/skynet_job.rb', line 236 def initialize( = {}) FIELDS.each do |field| if .has_key?(field) self.send("#{field}=".to_sym,[field]) elsif Skynet::CONFIG[:JOB_DEFAULTS][field] self.send("#{field}=".to_sym,Skynet::CONFIG[:JOB_DEFAULTS][field]) end if [:queue] raise Error.new("The provided queue (#{[:queue]}) does not exist in Skynet::CONFIG[:MESSAGE_QUEUES]") unless Skynet::CONFIG[:MESSAGE_QUEUES].index([:queue]) self.queue_id = Skynet::CONFIG[:MESSAGE_QUEUES].index([:queue]) end # Backward compatability self.mappers ||= [:map_tasks] self.reducers ||= [:reduce_tasks] end raise Error.new("You can not run a local master in async mode.") if self.async and self.local_master @job_id = task_id end |
Instance Attribute Details
#data_debug ⇒ Object
Returns the value of attribute data_debug.
99 100 101 |
# File 'lib/skynet/skynet_job.rb', line 99 def data_debug @data_debug end |
#use_local_queue ⇒ Object
Returns the value of attribute use_local_queue.
99 100 101 |
# File 'lib/skynet/skynet_job.rb', line 99 def use_local_queue @use_local_queue end |
Class Method Details
.debug_class_desc ⇒ Object
114 115 116 |
# File 'lib/skynet/skynet_job.rb', line 114 def self.debug_class_desc "JOB" end |
.mq ⇒ Object
727 728 729 |
# File 'lib/skynet/skynet_job.rb', line 727 def self.mq Skynet::MessageQueue.new end |
.results_by_job_id(job_id, timeout = 2) ⇒ Object
Given a job_id, returns the results from the message queue. Used to retrieve results of asyncronous jobs.
423 424 425 426 427 428 |
# File 'lib/skynet/skynet_job.rb', line 423 def self.results_by_job_id(job_id,timeout=2) = mq.take_result(job_id,timeout) result = .payload return nil unless result return result end |
Instance Method Details
#async? ⇒ Boolean
async is true if the async flag is set and the job is not a ‘single’ job, or in solo mode. async only applies to whether we run the master locally and whether we poll for the result
600 601 602 |
# File 'lib/skynet/skynet_job.rb', line 600 def async? @async and not (solo? or single? or local_master?) end |
#data_debug? ⇒ Boolean
620 621 622 |
# File 'lib/skynet/skynet_job.rb', line 620 def data_debug? @data_debug || Skynet::CONFIG[:SKYNET_JOB_DEBUG_DATA_LEVEL] end |
#display_info ⇒ Object
656 657 658 |
# File 'lib/skynet/skynet_job.rb', line 656 def display_info "#{name}, job_id: #{job_id}" end |
#enqueue_messages(messages) ⇒ Object
412 413 414 415 416 417 418 419 420 |
# File 'lib/skynet/skynet_job.rb', line 412 def () size = .size .each_with_index do |,ii| timeout = .expiry || 5 debug "RUN TASKS SUBMITTING #{.name} job_id: #{job_id} #{.payload.is_a?(Skynet::Task) ? 'task' + .payload.task_id.to_s : ''}" debug "RUN TASKS WORKER MESSAGE #{.name} job_id: #{job_id}", .to_a mq.(,timeout * 5) end end |
#gather_results(number_of_tasks, timeout = nil, description = nil) ⇒ Object
430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 |
# File 'lib/skynet/skynet_job.rb', line 430 def gather_results(number_of_tasks, timeout=nil, description=nil) debug "GATHER RESULTS job_id: #{job_id} - NOT AN ASYNC JOB" results = {} errors = {} started_at = Time.now.to_i begin loop do # debug "LOOKING FOR RESULT MESSAGE TEMPLATE" = self.mq.take_result(job_id,timeout * 2) ret_result = .payload if .payload_type == :error errors[.task_id] = ret_result error "ERROR RESULT TASK #{.task_id} returned #{errors[.task_id].inspect}" else results[.task_id] = ret_result debug "RESULT returned TASKID: #{.task_id} #{results[.task_id].inspect}" end debug "RESULT collected: #{(results.keys + errors.keys).size}, remaining: #{(number_of_tasks - (results.keys + errors.keys).uniq.size)}" printlog "RESULT collected: #{(results.keys + errors.keys).size}, remaining: #{(number_of_tasks - (results.keys + errors.keys).uniq.size)}" if data_debug? break if (number_of_tasks - (results.keys + errors.keys).uniq.size) <= 0 end rescue Skynet::RequestExpiredError => e local_mq_reset! error "A WORKER EXPIRED or ERRORED, #{description}, job_id: #{job_id}" if not errors.empty? raise WorkerError.new("WORKER ERROR #{description}, job_id: #{job_id} errors:#{errors.keys.size} out of #{number_of_tasks} workers. #{errors.pretty_print_inspect}") else raise Skynet::RequestExpiredError.new("WORKER ERROR, A WORKER EXPIRED! Did not get results or even errors back from all workers!") end end local_mq_reset! # ========== # = FIXME Tricky one. Should we throw an exception if we didn't get all the results back, or should we keep going. # = Maybe this is another needed option. # ========== # if not (errors.keys - results.keys).empty? # raise WorkerError.new("WORKER ERROR #{description}, job_id: #{job_id} errors:#{errors.keys.size} out of #{number_of_tasks} workers. #{errors.pretty_print_inspect}") # end return nil if results.values.compact.empty? return results.values end |
#keep_map_tasks ⇒ Object
571 572 573 |
# File 'lib/skynet/skynet_job.rb', line 571 def keep_map_tasks @keep_map_tasks || Skynet::CONFIG[:DEFAULT_KEEP_MAP_TASKS] end |
#keep_reduce_tasks ⇒ Object
575 576 577 |
# File 'lib/skynet/skynet_job.rb', line 575 def keep_reduce_tasks @keep_reduce_tasks || Skynet::CONFIG[:DEFAULT_KEEP_REDUCE_TASKS] end |
#local_master? ⇒ Boolean
608 609 610 |
# File 'lib/skynet/skynet_job.rb', line 608 def local_master? @local_master or solo? end |
#local_mq ⇒ Object
723 724 725 |
# File 'lib/skynet/skynet_job.rb', line 723 def local_mq @local_mq ||= LocalMessageQueue.new end |
#local_mq_reset! ⇒ Object
476 477 478 479 480 481 |
# File 'lib/skynet/skynet_job.rb', line 476 def local_mq_reset! if use_local_queue? local_mq.reset! self.use_local_queue=false end end |
#map=(map) ⇒ Object
669 670 671 672 673 674 675 676 677 678 679 |
# File 'lib/skynet/skynet_job.rb', line 669 def map=(map) reset! return unless map if map.class == String or map.class == Class @map = map.to_s elsif map.is_a?(Proc) @map = map else raise BadMapOrReduceError.new("#{self.class}.map accepts a class name or a proc. Got #{map}") end end |
#map_data=(map_data) ⇒ Object
664 665 666 667 |
# File 'lib/skynet/skynet_job.rb', line 664 def map_data=(map_data) reset! @map_data = map_data end |
#map_enqueue ⇒ Object
324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 |
# File 'lib/skynet/skynet_job.rb', line 324 def map_enqueue task_ids = [] map_tasks = self.map_tasks self.use_local_queue = map_local? if map_tasks number_of_tasks = 0 size = map_tasks.size - 1 printlog "MESSAGES TO MAP ENQUEUE #{size}" if data_debug? map_tasks.each_with_index do |task,ii| printlog "#{size - ii} MAP TASKS LEFT TO ENQUEUE" if data_debug? number_of_tasks += 1 ((task)) end end return number_of_tasks end |
#map_local? ⇒ Boolean
579 580 581 582 583 584 585 |
# File 'lib/skynet/skynet_job.rb', line 579 def map_local? return true if solo? or single? return true if keep_map_tasks == true # error "RUN MAP 2.4 BEFORE MAP #{display_info} KEEPMT:#{keep_map_tasks} DKMT:#{Skynet::CONFIG[:DEFAULT_KEEP_MAP_TASKS]} MDCLASS: #{map_tasks.data.class} #{(map_tasks.data.is_a?(Array) ? map_tasks.data.size : '')}" return true if keep_map_tasks and map_tasks.data.is_a?(Array) and map_tasks.data.size <= keep_map_tasks return false end |
#map_reduce_class=(klass) ⇒ Object
693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 |
# File 'lib/skynet/skynet_job.rb', line 693 def map_reduce_class=(klass) reset! unless klass.class == String or klass.class == Class raise BadMapOrReduceError.new("#{self.class}.map_reduce only accepts a class name: #{klass} #{klass.class}") end klass = klass.to_s @map = klass self.name ||= "#{klass} MASTER" self.map_name ||= "#{klass} MAP" if klass.constantize.respond_to?(:reduce) @reduce ||= klass self.reduce_name ||= "#{klass} REDUCE" end @reduce_partitioner ||= klass if klass.constantize.respond_to?(:reduce_partition) @map_partitioner ||= klass if klass.constantize.respond_to?(:map_partitioner) end |
#map_results(number_of_tasks) ⇒ Object
341 342 343 344 345 346 347 348 |
# File 'lib/skynet/skynet_job.rb', line 341 def map_results(number_of_tasks) debug "RUN MAP 2.4 BEFORE MAP #{display_info} MAP_LOCAL?:#{map_local?} USE_LOCAL_QUEUE?:#{use_local_queue?}" results = gather_results(number_of_tasks, map_timeout, map_name) return unless results results.compact! if results.is_a?(Array) debug "RUN MAP 2.5 RESULTS AFTER RUN #{display_info} MAP_LOCAL:#{map_local?} USE_LOCAL_QUEUE?:#{use_local_queue?} results:", results.inspect results end |
#map_retry ⇒ Object
563 564 565 |
# File 'lib/skynet/skynet_job.rb', line 563 def map_retry @map_retry || Skynet::CONFIG[:DEFAULT_MAP_RETRY] end |
#map_tasks ⇒ Object
497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 |
# File 'lib/skynet/skynet_job.rb', line 497 def map_tasks @map_tasks ||= begin map_tasks = [] debug "RUN MAP 2.1 #{display_info} data size before partition: #{@map_data.size}" if @map_data.respond_to?(:size) debug "RUN MAP 2.1 #{display_info} data before partition:", @map_data = { :process => @map, :name => map_name, :map_or_reduce => :map, :result_timeout => map_timeout, :retry => map_retry || Skynet::CONFIG[:DEFAULT_MAP_RETRY] } if @map_data.is_a?(Array) debug "RUN MAP 2.2 DATA IS Array #{display_info}" num_mappers = @map_data.length < @mappers ? @map_data.length : @mappers map_data = if @map_partitioner @map_partitioner.call(@map_data,num_mappers) else Skynet::Partitioners::SimplePartitionData.reduce_partition(@map_data, num_mappers) end debug "RUN MAP 2.3 #{display_info} data size after partition: #{map_data.size}" debug "RUN MAP 2.3 #{display_info} map data after partition:", map_data elsif @map_data.is_a?(Enumerable) debug "RUN MAP 2.2 DATA IS ENUMERABLE #{display_info} map_data_class: #{@map_data.class}" map_data = @map_data else debug "RUN MAP 2.2 DATA IS NOT ARRAY OR ENUMERABLE #{display_info} map_data_class: #{@map_data.class}" map_data = [ @map_data ] end Skynet::TaskIterator.new(, map_data) end end |
#master? ⇒ Boolean
604 605 606 |
# File 'lib/skynet/skynet_job.rb', line 604 def master? async? or not local_master? end |
#master_enqueue ⇒ Object
305 306 307 308 309 |
# File 'lib/skynet/skynet_job.rb', line 305 def master_enqueue self.use_local_queue = local_master? = ([master_task]) () end |
#master_results ⇒ Object
320 321 322 |
# File 'lib/skynet/skynet_job.rb', line 320 def master_results @results = gather_results(1,master_timeout,name) unless defined?(@results) end |
#master_retry ⇒ Object
559 560 561 |
# File 'lib/skynet/skynet_job.rb', line 559 def master_retry @master_retry || Skynet::CONFIG[:DEFAULT_MASTER_RETRY] end |
#master_task ⇒ Object
484 485 486 487 488 489 490 491 492 493 494 495 |
# File 'lib/skynet/skynet_job.rb', line 484 def master_task @master_task ||= begin raise Exception.new("No map provided") unless @map # Make sure to set single to false in our own Job object. # We're just passing along whether they set us to single. # If we were single, we'd never send off the master to be run externally. @single = false task = Skynet::Task.master_task(self) end end |
#mq ⇒ Object
715 716 717 718 719 720 721 |
# File 'lib/skynet/skynet_job.rb', line 715 def mq if use_local_queue? local_mq else @mq ||= Skynet::MessageQueue.new end end |
#partition_data(post_map_data) ⇒ Object
350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 |
# File 'lib/skynet/skynet_job.rb', line 350 def partition_data(post_map_data) info "RUN REDUCE 3.1 BEFORE PARTITION #{display_info} reducers: #{reducers}" debug "RUN REDUCE 3.1 : #{reducers} #{name}, job_id:#{job_id}", post_map_data printlog "RUN REDUCE 3.1 : #{reducers} #{name}, job_id:#{job_id}", post_map_data if data_debug? return unless post_map_data partitioned_data = nil if not @reduce_partition # ===================== # = XXX HACK # = There was a bug in Job where the reduce_partition of master jobs wasn't being set! This is to catch that. # = It handles it by checking if the map class has a reduce partitioner. Maybe this is a good thing to leave anyway. # ===================== if @map.is_a?(String) and @map.constantize.respond_to?(:reduce_partition) partitioned_data = @map.constantize.reduce_partition(post_map_data, reducers) else partitioned_data = Skynet::Partitioners::RecombineAndSplit.reduce_partition(post_map_data, reducers) end elsif @reduce_partition.is_a?(String) partitioned_data = @reduce_partition.constantize.reduce_partition(post_map_data, reducers) else partitioned_data = @reduce_partition.call(post_map_data, reducers) end partitioned_data.compact! if partitioned_data info "RUN REDUCE 3.2 AFTER PARTITION #{display_info} reducers: #{reducers}" debug "RUN REDUCE 3.2 AFTER PARTITION #{display_info} data:", partitioned_data if partitioned_data printlog "RUN REDUCE 3.2 AFTER PARTITION #{display_info} data:", partitioned_data if data_debug? partitioned_data end |
#reduce=(reduce) ⇒ Object
681 682 683 684 685 686 687 688 689 690 691 |
# File 'lib/skynet/skynet_job.rb', line 681 def reduce=(reduce) reset! return unless reduce if reduce.class == String or reduce.class == Class @reduce = reduce.to_s elsif reduce.is_a?(Proc) @reduce = reduce else raise BadMapOrReduceError.new("#{self.class}.reduce accepts a class name or a proc. Got #{reduce}") end end |
#reduce_enqueue(partitioned_data) ⇒ Object
379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 |
# File 'lib/skynet/skynet_job.rb', line 379 def reduce_enqueue(partitioned_data) return partitioned_data unless @reduce and reducers and reducers > 0 debug "RUN REDUCE 3.3 CREATED REDUCE TASKS #{display_info}", partitioned_data size = partitioned_data.size printlog "REDUCE MESSAGES TO ENQUEUE #{size}" if data_debug? reduce_tasks = self.reduce_tasks(partitioned_data) self.use_local_queue = reduce_local?(reduce_tasks) number_of_tasks = 0 reduce_tasks.each_with_index do |task,ii| printlog "#{size - ii} REDUCE TASKS LEFT TO ENQUEUE" if data_debug? number_of_tasks += 1 ((task)) end return number_of_tasks end |
#reduce_local?(reduce_tasks) ⇒ Boolean
587 588 589 590 591 592 |
# File 'lib/skynet/skynet_job.rb', line 587 def reduce_local?(reduce_tasks) return true if solo? or single? return true if keep_reduce_tasks == true return true if keep_reduce_tasks and reduce_tasks.data.is_a?(Array) and reduce_tasks.data.size <= keep_reduce_tasks return false end |
#reduce_results(number_of_tasks) ⇒ Object
396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 |
# File 'lib/skynet/skynet_job.rb', line 396 def reduce_results(number_of_tasks) results = gather_results(number_of_tasks, reduce_timeout, reduce_name) printlog "REDUCE RESULTS", results if data_debug? if results.is_a?(Array) and results.first.is_a?(Array) final = [] results.each do |result| final += result end results = final end debug "RUN REDUCE 3.4 AFTER REDUCE #{display_info} results size: #{results ? results.size : ''}" debug "RUN REDUCE 3.4 AFTER REDUCE #{display_info} results:", results if results printlog "POST REDUCE RESULTS", results if data_debug? return results end |
#reduce_retry ⇒ Object
567 568 569 |
# File 'lib/skynet/skynet_job.rb', line 567 def reduce_retry @reduce_retry || Skynet::CONFIG[:DEFAULT_REDUCE_RETRY] end |
#reduce_tasks(partitioned_data) ⇒ Object
534 535 536 537 538 539 540 541 542 543 544 545 |
# File 'lib/skynet/skynet_job.rb', line 534 def reduce_tasks(partitioned_data) @reduce_tasks ||= begin = { :name => reduce_name, :process => @reduce, :map_or_reduce => :reduce, :result_timeout => reduce_timeout, :retry => reduce_retry || Skynet::CONFIG[:DEFAULT_REDUCE_RETRY] } Skynet::TaskIterator.new(, partitioned_data) end end |
#reset! ⇒ Object
624 625 626 627 |
# File 'lib/skynet/skynet_job.rb', line 624 def reset! @map_tasks = nil @reduce_tasks = nil end |
#results ⇒ Object
Returns the final results of this map/reduce job. If results is called on an :async job calling results will block until results are found or the master_timeout is reached.
313 314 315 316 317 318 |
# File 'lib/skynet/skynet_job.rb', line 313 def results # ============================================ # = FIXME Maybe this can have better warnings if the results aren't ready yet. = # ============================================ master_results end |
#run(options = {}) ⇒ Object
Options are: :local_master
BOOL (DEFAULT true)
By default, your Skynet::Job will act as the master for your map/reduce job, doling out
tasks, waiting for other workers to complete and return their results and dealing with
merging and partitioning the data. If you run in async mode, another worker will handle
being the master for your job without blocking. If you run :async => false, :local_master => false
Skynet will let another worker be the master for your job, but will block waiting for the
final results. The benefit of this is that if your process dies, the Job will continue to
run remotely.
:async
BOOL (DEFAULT false)
If you run in async mode, another worker will handle being the master for your job without blocking.
You can not pass :local_master => true, :async => true since the only way to allow your
job to run asyncronously is to have a remote_master.
You can pass any options you might pass to Skynet::Job.new. Warning: Passing options to run
will permanently change properties of the job.
275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 |
# File 'lib/skynet/skynet_job.rb', line 275 def run( = {}) FIELDS.each do |field| if .has_key?(field) self.send("#{field}=".to_sym,[field]) end end raise Error.new("You can not run a local master in async mode.") if self.async and self.local_master info "RUN 1 BEGIN #{name}, job_id:#{job_id} vers: #{version} async:#{async}, local_master: #{local_master}, master?: #{master?}" # run the master task if we're running async or local_master if master? master_enqueue # ==================================================================================== # = FIXME If async Return a handle to an object that can used to retrieve the results later. # ==================================================================================== async? ? job_id : master_results else number_of_tasks_queued = self.map_enqueue map_results = self.map_results(number_of_tasks_queued) return map_results unless map_results and self.reduce partitioned_data = self.partition_data(map_results) return unless partitioned_data number_of_tasks_queued = self.reduce_enqueue(partitioned_data) @results = self.reduce_results(number_of_tasks_queued) end end |
#run_master ⇒ Object
710 711 712 713 |
# File 'lib/skynet/skynet_job.rb', line 710 def run_master error "run_master has been deprecated, please use run" run(:local_master => false) end |
#single? ⇒ Boolean
616 617 618 |
# File 'lib/skynet/skynet_job.rb', line 616 def single? @single end |
#solo? ⇒ Boolean
612 613 614 |
# File 'lib/skynet/skynet_job.rb', line 612 def solo? (@solo or CONFIG[:SOLO]) end |
#start_after=(time) ⇒ Object
660 661 662 |
# File 'lib/skynet/skynet_job.rb', line 660 def start_after=(time) @start_after = (time.is_a?(Time) ? time.to_i : time) end |
#task_id ⇒ Object
640 641 642 |
# File 'lib/skynet/skynet_job.rb', line 640 def task_id @task_id ||= get_unique_id(1).to_i end |
#tasks_to_messages(tasks) ⇒ Object
547 548 549 550 551 552 553 554 555 556 557 |
# File 'lib/skynet/skynet_job.rb', line 547 def (tasks) if tasks.is_a?(Skynet::TaskIterator) tasks = tasks.to_a elsif not tasks.is_a?(Array) tasks = [tasks] end tasks.collect do |task| Skynet::Message.(task,self) end end |
#to_h ⇒ Object
629 630 631 632 633 634 635 636 637 638 |
# File 'lib/skynet/skynet_job.rb', line 629 def to_h if @map.kind_of?(Proc) or @reduce.kind_of?(Proc) raise Skynet::Error.new("You have a Proc in your map or reduce. This can't be turned into a hash.") end hash = {} FIELDS.each do |field| hash[field] = self.send(field) if self.send(field) end hash end |
#use_local_queue? ⇒ Boolean
594 595 596 |
# File 'lib/skynet/skynet_job.rb', line 594 def use_local_queue? @use_local_queue end |
#version ⇒ Object
644 645 646 647 648 649 650 |
# File 'lib/skynet/skynet_job.rb', line 644 def version return 1 if solo? @version ||= begin @@worker_version ||= self.mq.get_worker_version || 1 @@worker_version end end |
#version=(v) ⇒ Object
652 653 654 |
# File 'lib/skynet/skynet_job.rb', line 652 def version=(v) @version = v end |