Class: Skynet::Job

Inherits:
Object
  • Object
show all
Includes:
GuidGenerator, SkynetDebugger
Defined in:
lib/skynet/skynet_job.rb,
lib/skynet/skynet_tuplespace_server.rb

Overview

Skynet::Job is the main interface to Skynet. You create a job object giving it the starting data (map_data), along with what class has the map/reduce functions in it. Even though Skynet is distributed, when you call #run on a plain Skynet::Job, it will still block in your current process until it has completed your task. If you want to go on to do other things you’ll want to pass :async => true when creating a new job. Then later call job.results to retrieve your results.

There are also many global configuration options which can be controlled through Skynet::CONFIG

Example Usage: Create a file called mapreduce_test.rb with the following.

class MapreduceTest
  include SkynetDebugger   ## This gives you logging methods such as log, error, info, fatal

  def self.run
    job = Skynet::Job.new(
      :mappers          => 2, 
      :reducers         => 1,
      :map_reduce_class => self,
      :map_data         => [OpenStruct.new({:created_by => 2}),OpenStruct.new({:created_by => 2}),OpenStruct.new({:created_by => 3})]
    )    
    results = job.run
  end

  def self.map(profiles)
    result = Array.new
    profiles.each do |profile|
      result << [profile.created_by, 1] if profile.created_by
    end
    result
  end

  def self.reduce(pairs)
    totals = Hash.new
    pairs.each do |pair|
      created_by, count = pair[0], pair[1]
      totals[created_by] ||= 0
      totals[created_by] += count
    end
    totals
  end
end

You need to make sure Skynet is running with your class loaded. That’s is how Skynet works. Since there is no easy way to actually pass code around the network, each skynet worker must already have your code loaded. If you have skynet started, stop it and then start it with the -r flag to tell it where to find your class it should require.

$ skynet -r mapreduce_test.rb

Then go into the skynet console to test running your map reduce task.

$ skynet console -r mapreduce_test.rb
skynet>> MapreduceTest.run     # returns {2=>2, 3=>1}

In the example above, you might notice that self.map and self.reduce both accept Arrays.

If you do not want to deal with getting arrays of map_data or reduce_data, you can include MapreduceHelper into your class and then implement self.map_each and self.reduce_each methods.

The included self.map and self.reduce methods will handle iterating over the map_data and reduce_data, passing each element to your map_each and reduce_each methods respectively. They will also handle error handling within that loop to make sure even if a single map or reduce fails, processing will continue.

If you do not want processing to continue if a map fails, do not use the MapreduceHelper mixin.

Since Skynet must have your code, you will probably want to install skynet into the application that skynet needs access to in order to run your jobs. See bin/skynet_install for more info.

See new for the many other options to control various Skynet::Job settings.

Direct Known Subclasses

AsyncJob

Defined Under Namespace

Classes: BadMapOrReduceError, Error, LocalMessageQueue, WorkerError

Constant Summary collapse

FIELDS =
[:queue_id, :mappers, :reducers, :silent, :name, :map_timeout, :map_data, :job_id,
  :reduce_timeout, :master_timeout, :map_name, :reduce_name,
  :master_result_timeout, :result_timeout, :start_after, :solo, :single, :version,
  :map, :map_partitioner, :reduce, :reduce_partition, :map_reduce_class,
  :master_retry, :map_retry, :reduce_retry,
  :keep_map_tasks, :keep_reduce_tasks,
  :local_master, :async, :data_debug
]
@@worker_ver =
nil

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from GuidGenerator

#get_unique_id

Methods included from SkynetDebugger

#args_pp, #debug, #debug_header, #error, #fatal, included, #info, #log, #printlog, #stderr, #stdout, #warn

Constructor Details

#initialize(options = {}) ⇒ Job

Most of the time you will merely call #new(options) and then #run on the returned object.

Options are: :local_master BOOL (DEFAULT true)

By default, your Skynet::Job will act as the master for your map/reduce job, doling out
tasks, waiting for other workers to complete and return their results and dealing with
merging and partitioning the data.   If you call #run in async mode, another worker will handle 
being the master for your job without blocking.  If you run :async => false, :local_master => false
Skynet will let another worker be the master for your job, but will block waiting for the 
final results.  The benefit of this is that if your process dies, the Job will continue to
run remotely.

:async BOOL (DEFAULT false)

If you run in async mode, another worker will handle being the master for your job without blocking.
You can not pass :local_master => true, :async => true since the only way to allow your
job to run asyncronously is to have a remote_master.

:map_data(Array or Enumerable)

map_data should be an Array or Enumerable that data Skynet::Job will split up 
and distribute among your workers.   You can stream data to Skynet::Job by passing
an Enumerable that implements next or each.

:map_reduce_class(Class or Class Name)

Skynet::Job will look for class methods named self.map, self.reduce, self.map_partitioner, 
self.reduce_partition in your map_reduce_class.  The only method requires is self.map.
Each of these methods must accept an array.  Examples above.

:map(Class Name)

You can pass a classname, or a proc.  If you pass a classname, Job will look for a method
called self.map in that class.
WARNING: Passing a proc does not work right now.

:reduce(Class Name)

You can pass a classname, or a proc.  If you pass a classname, Job will look for a method
called self.reduce in that class.
WARNING: Passing a proc does not work right now.

:reduce_partition(Class Name)

You can pass a classname, or a proc.  If you pass a classname, Job will look for a method
called self.reduce_partition in that class.
WARNING: Passing a proc does not work right now.

:mappers Fixnum

The number of mappers to partition map data for.

:reducers Fixnum

The number of reducers to partition the returned map_data for.

:master_retry Fixnum

If the master fails for any reason, how many times should it be retried?  You can also set
Skynet::CONFIG[:DEFAULT_MASTER_RETRY] (DEFAULT 0)

:map_retry Fixnum

If a map task fails for any reason, how many times should it be retried?  You can also set
Skynet::CONFIG[:DEFAULT_MAP_RETRY] (DEFAULT 3)

:reduce_retry Fixnum

If a reduce task fails for any reason, how many times should it be retried?  You can also set
Skynet::CONFIG[:DEFAULT_REDUCE_RETRY] (DEFAULT 3)

:master_timeout, :map_timeout, :reduce_timeout, master_result_timeout, result_timeout

These control how long skynet should wait for particular actions to be finished.  
The master_timeout controls how long the master should wait for ALL map/reduce tasks ie. the entire job to finish.
The master_result_timeout controls how long the final result should wait in the queue before being expired.
The map and reduce timeouts control how long individual map and reduce tasks shoudl take.

:single BOOL

By default the master task distributes the map and reduce tasks to other workers.  
In single mode the master will take care of the map and reduce tasks by itself.
This is handy when you really want to just perform some single action asyncronously.
In this case you're merely using Skynet to postpone some action. In single mode, the
first worker that picks up your task will just complete it as opposed to trying to distribute
it to another worker.

:start_after Time or Time.to_i

Do not start job until :start_after has passed

:queue String

Which queue should this Job go in to?  The queue provided is merely used to
determine the queue_id.
Queues are defined in Skynet::CONFIG[:MESSAGE_QUEUES]

:queue_id Fixnum (DEFAULT 0)

Which queue should this Job go in to?  
Queues are defined in Skynet::CONFIG[:MESSAGE_QUEUES]

:solo BOOL

One normally turns solo mode in in Skynet::Config using Skynet::CONFIG[:SOLO] = true
In solo mode, Skynet jobs do not add items to a Skynet queue. Instead they do all
work in place.  It's like a Skynet simulation mode.  It will complete all tasks
without Skynet running.  Great for testing.  You can also wrap code blocks in
Skynet.solo {} to run that code in solo mode.

:version Fixnum

If you do not provide a version the current worker version will be used.
Skynet workers start at a specific version and only look for jobs that match that version.
A worker will continue looking for jobs at that version until there are no more jobs left on
the queue for that version.  At that time, the worker will check to see if there is a new version.
If there is, it will restart itself at the new version (assuming you had already pushed code to
said workers.)
To retrieve the current version, set the current version or increment the current version, see
Skynet::Job.set_worker_version, Skynet::Job.get_worker_version, Skynet::Job.increment_worker_version

:name, :map_name, :reduce_name

These name methods are merely for debugging while watching the Skynet logs or the Skynet queue.
If you do not supply names, it will try and provide sensible ones based on your class names.

:keep_map_tasks BOOL or Fixnum (DEFAULT 1)

If true, the master will run the map_tasks locally.
If a number is provided, the master will run the map_tasks locally if there are 
LESS THAN OR EQUAL TO the number provided.
You may also set Skynet::CONFIG[:DEFAILT_KEEP_MAP_TASKS] DEFAULT 1

:keep_reduce_tasks BOOL or Fixnum (DEFAULT 1)

If true, the master will run the reduce_tasks locally.
If a number is provided, the master will run the reduce_tasks locally if there are 
LESS THAN OR EQUAL TO the number provided.
You may also set Skynet::CONFIG[:DEFAILT_REDUCVE_MAP_TASKS] DEFAULT 1

Raises:



236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
# File 'lib/skynet/skynet_job.rb', line 236

def initialize(options = {})
  FIELDS.each do |field|
    if options.has_key?(field)
      self.send("#{field}=".to_sym,options[field])
    elsif Skynet::CONFIG[:JOB_DEFAULTS][field]
      self.send("#{field}=".to_sym,Skynet::CONFIG[:JOB_DEFAULTS][field])
    end              
    if options[:queue]
      raise Error.new("The provided queue (#{options[:queue]}) does not exist in Skynet::CONFIG[:MESSAGE_QUEUES]") unless Skynet::CONFIG[:MESSAGE_QUEUES].index(options[:queue])
      self.queue_id = Skynet::CONFIG[:MESSAGE_QUEUES].index(options[:queue])
    end
            
    # Backward compatability
    self.mappers  ||= options[:map_tasks]
    self.reducers ||= options[:reduce_tasks]        
  end                     

  raise Error.new("You can not run a local master in async mode.") if self.async and self.local_master

  @job_id = task_id
end

Instance Attribute Details

#data_debugObject

Returns the value of attribute data_debug.



99
100
101
# File 'lib/skynet/skynet_job.rb', line 99

def data_debug
  @data_debug
end

#use_local_queueObject

Returns the value of attribute use_local_queue.



99
100
101
# File 'lib/skynet/skynet_job.rb', line 99

def use_local_queue
  @use_local_queue
end

Class Method Details

.debug_class_descObject



114
115
116
# File 'lib/skynet/skynet_job.rb', line 114

def self.debug_class_desc
  "JOB"
end

.mqObject



727
728
729
# File 'lib/skynet/skynet_job.rb', line 727

def self.mq
  Skynet::MessageQueue.new
end

.results_by_job_id(job_id, timeout = 2) ⇒ Object

Given a job_id, returns the results from the message queue. Used to retrieve results of asyncronous jobs.



423
424
425
426
427
428
# File 'lib/skynet/skynet_job.rb', line 423

def self.results_by_job_id(job_id,timeout=2)
  result_message = mq.take_result(job_id,timeout)
  result     = result_message.payload
  return nil unless result
  return result
end

Instance Method Details

#async?Boolean

async is true if the async flag is set and the job is not a ‘single’ job, or in solo mode. async only applies to whether we run the master locally and whether we poll for the result

Returns:

  • (Boolean)


600
601
602
# File 'lib/skynet/skynet_job.rb', line 600

def async?
  @async and not (solo? or single? or local_master?)
end

#data_debug?Boolean

Returns:

  • (Boolean)


620
621
622
# File 'lib/skynet/skynet_job.rb', line 620

def data_debug?
  @data_debug || Skynet::CONFIG[:SKYNET_JOB_DEBUG_DATA_LEVEL]
end

#display_infoObject



656
657
658
# File 'lib/skynet/skynet_job.rb', line 656

def display_info
  "#{name}, job_id: #{job_id}"
end

#enqueue_messages(messages) ⇒ Object



412
413
414
415
416
417
418
419
420
# File 'lib/skynet/skynet_job.rb', line 412

def enqueue_messages(messages)
  size = messages.size
  messages.each_with_index do |message,ii|
    timeout = message.expiry || 5
    debug "RUN TASKS SUBMITTING #{message.name} job_id: #{job_id} #{message.payload.is_a?(Skynet::Task) ? 'task' + message.payload.task_id.to_s : ''}"        
    debug "RUN TASKS WORKER MESSAGE #{message.name} job_id: #{job_id}", message.to_a
    mq.write_message(message,timeout * 5)
  end
end

#gather_results(number_of_tasks, timeout = nil, description = nil) ⇒ Object



430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
# File 'lib/skynet/skynet_job.rb', line 430

def gather_results(number_of_tasks, timeout=nil, description=nil)
  debug "GATHER RESULTS job_id: #{job_id} - NOT AN ASYNC JOB"
  results = {}
  errors  = {}
  started_at = Time.now.to_i

  begin
    loop do
      # debug "LOOKING FOR RESULT MESSAGE TEMPLATE"
      result_message = self.mq.take_result(job_id,timeout * 2)
      ret_result     = result_message.payload

      if result_message.payload_type == :error
        errors[result_message.task_id] = ret_result
        error "ERROR RESULT TASK #{result_message.task_id} returned #{errors[result_message.task_id].inspect}"
      else
        results[result_message.task_id] = ret_result
        debug "RESULT returned TASKID: #{result_message.task_id} #{results[result_message.task_id].inspect}"
      end          
      debug "RESULT collected: #{(results.keys + errors.keys).size}, remaining: #{(number_of_tasks - (results.keys + errors.keys).uniq.size)}"
      printlog "RESULT collected: #{(results.keys + errors.keys).size}, remaining: #{(number_of_tasks - (results.keys + errors.keys).uniq.size)}" if data_debug?
      break if (number_of_tasks - (results.keys + errors.keys).uniq.size) <= 0
    end
  rescue Skynet::RequestExpiredError => e
    local_mq_reset!
    error "A WORKER EXPIRED or ERRORED, #{description}, job_id: #{job_id}"
    if not errors.empty?
      raise WorkerError.new("WORKER ERROR #{description}, job_id: #{job_id} errors:#{errors.keys.size} out of #{number_of_tasks} workers. #{errors.pretty_print_inspect}")
    else
      raise Skynet::RequestExpiredError.new("WORKER ERROR, A WORKER EXPIRED!  Did not get results or even errors back from all workers!")
    end
  end
  local_mq_reset!

  # ==========
  # = FIXME Tricky one.  Should we throw an exception if we didn't get all the results back, or should we keep going.
  # = Maybe this is another needed option.
  # ==========      
  # if not (errors.keys - results.keys).empty?
  #   raise WorkerError.new("WORKER ERROR #{description}, job_id: #{job_id} errors:#{errors.keys.size} out of #{number_of_tasks} workers. #{errors.pretty_print_inspect}")
  # end
  
  return nil if results.values.compact.empty?
  return results.values
end

#keep_map_tasksObject



571
572
573
# File 'lib/skynet/skynet_job.rb', line 571

def keep_map_tasks      
  @keep_map_tasks    || Skynet::CONFIG[:DEFAULT_KEEP_MAP_TASKS]
end

#keep_reduce_tasksObject



575
576
577
# File 'lib/skynet/skynet_job.rb', line 575

def keep_reduce_tasks
  @keep_reduce_tasks || Skynet::CONFIG[:DEFAULT_KEEP_REDUCE_TASKS]
end

#local_master?Boolean

Returns:

  • (Boolean)


608
609
610
# File 'lib/skynet/skynet_job.rb', line 608

def local_master?
  @local_master or solo?
end

#local_mqObject



723
724
725
# File 'lib/skynet/skynet_job.rb', line 723

def local_mq
  @local_mq ||= LocalMessageQueue.new
end

#local_mq_reset!Object



476
477
478
479
480
481
# File 'lib/skynet/skynet_job.rb', line 476

def local_mq_reset!
  if use_local_queue?
    local_mq.reset!
    self.use_local_queue=false
  end
end

#map=(map) ⇒ Object



669
670
671
672
673
674
675
676
677
678
679
# File 'lib/skynet/skynet_job.rb', line 669

def map=(map)
  reset!
  return unless map
  if map.class == String or map.class == Class
    @map = map.to_s
  elsif map.is_a?(Proc)
    @map = map        
  else
    raise BadMapOrReduceError.new("#{self.class}.map accepts a class name or a proc. Got #{map}") 
  end      
end

#map_data=(map_data) ⇒ Object



664
665
666
667
# File 'lib/skynet/skynet_job.rb', line 664

def map_data=(map_data)
  reset!
  @map_data = map_data
end

#map_enqueueObject



324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
# File 'lib/skynet/skynet_job.rb', line 324

def map_enqueue      
  task_ids             = []
  map_tasks            = self.map_tasks
  self.use_local_queue = map_local?
  if map_tasks        
    number_of_tasks = 0
    size = map_tasks.size - 1
    printlog "MESSAGES TO MAP ENQUEUE #{size}" if data_debug?
    map_tasks.each_with_index do |task,ii|
      printlog "#{size - ii} MAP TASKS LEFT TO ENQUEUE" if data_debug?
      number_of_tasks += 1
      enqueue_messages(tasks_to_messages(task))
    end            
  end
  return number_of_tasks
end

#map_local?Boolean

Returns:

  • (Boolean)


579
580
581
582
583
584
585
# File 'lib/skynet/skynet_job.rb', line 579

def map_local?
  return true if solo? or single?
  return true if keep_map_tasks == true
  # error "RUN MAP 2.4 BEFORE MAP #{display_info} KEEPMT:#{keep_map_tasks} DKMT:#{Skynet::CONFIG[:DEFAULT_KEEP_MAP_TASKS]} MDCLASS: #{map_tasks.data.class} #{(map_tasks.data.is_a?(Array) ? map_tasks.data.size : '')}"
  return true if keep_map_tasks and map_tasks.data.is_a?(Array) and map_tasks.data.size <= keep_map_tasks
  return false
end

#map_reduce_class=(klass) ⇒ Object



693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
# File 'lib/skynet/skynet_job.rb', line 693

def map_reduce_class=(klass)
  reset!
  unless klass.class == String or klass.class == Class
    raise BadMapOrReduceError.new("#{self.class}.map_reduce only accepts a class name: #{klass} #{klass.class}") 
  end
  klass = klass.to_s
  @map = klass
  self.name     ||= "#{klass} MASTER"
  self.map_name ||= "#{klass} MAP"
  if klass.constantize.respond_to?(:reduce)
    @reduce ||= klass 
    self.reduce_name ||= "#{klass} REDUCE"
  end
  @reduce_partitioner ||= klass if klass.constantize.respond_to?(:reduce_partition)
  @map_partitioner    ||= klass if klass.constantize.respond_to?(:map_partitioner)
end

#map_results(number_of_tasks) ⇒ Object



341
342
343
344
345
346
347
348
# File 'lib/skynet/skynet_job.rb', line 341

def map_results(number_of_tasks)
  debug "RUN MAP 2.4 BEFORE MAP #{display_info} MAP_LOCAL?:#{map_local?} USE_LOCAL_QUEUE?:#{use_local_queue?}"
  results = gather_results(number_of_tasks, map_timeout, map_name)
  return unless results
  results.compact! if results.is_a?(Array)
  debug "RUN MAP 2.5 RESULTS AFTER RUN #{display_info} MAP_LOCAL:#{map_local?} USE_LOCAL_QUEUE?:#{use_local_queue?} results:", results.inspect
  results
end

#map_retryObject



563
564
565
# File 'lib/skynet/skynet_job.rb', line 563

def map_retry
  @map_retry         || Skynet::CONFIG[:DEFAULT_MAP_RETRY]
end

#map_tasksObject



497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
# File 'lib/skynet/skynet_job.rb', line 497

def map_tasks
  @map_tasks ||= begin
    map_tasks = []
    debug "RUN MAP 2.1 #{display_info} data size before partition: #{@map_data.size}" if @map_data.respond_to?(:size)
    debug "RUN MAP 2.1 #{display_info} data before partition:", @map_data

    task_options = {
      :process        => @map,
      :name           => map_name,
      :map_or_reduce  => :map,
      :result_timeout => map_timeout,
      :retry          => map_retry || Skynet::CONFIG[:DEFAULT_MAP_RETRY]
    }

    if @map_data.is_a?(Array)
      debug "RUN MAP 2.2 DATA IS Array #{display_info}"
      num_mappers = @map_data.length < @mappers ? @map_data.length : @mappers

      map_data = if @map_partitioner
        @map_partitioner.call(@map_data,num_mappers)
      else
        Skynet::Partitioners::SimplePartitionData.reduce_partition(@map_data, num_mappers)
      end
    
      debug "RUN MAP 2.3 #{display_info} data size after partition: #{map_data.size}"
      debug "RUN MAP 2.3 #{display_info} map data after partition:", map_data
    elsif @map_data.is_a?(Enumerable)
      debug "RUN MAP 2.2 DATA IS ENUMERABLE #{display_info} map_data_class: #{@map_data.class}"
      map_data = @map_data
    else
      debug "RUN MAP 2.2 DATA IS NOT ARRAY OR ENUMERABLE #{display_info} map_data_class: #{@map_data.class}"
      map_data = [ @map_data ]
    end
    Skynet::TaskIterator.new(task_options, map_data)
  end
end

#master?Boolean

Returns:

  • (Boolean)


604
605
606
# File 'lib/skynet/skynet_job.rb', line 604

def master?
  async? or not local_master?
end

#master_enqueueObject



305
306
307
308
309
# File 'lib/skynet/skynet_job.rb', line 305

def master_enqueue
  self.use_local_queue = local_master?
  messages = tasks_to_messages([master_task])
  enqueue_messages(messages)
end

#master_resultsObject



320
321
322
# File 'lib/skynet/skynet_job.rb', line 320

def master_results                     
  @results = gather_results(1,master_timeout,name) unless defined?(@results)
end

#master_retryObject



559
560
561
# File 'lib/skynet/skynet_job.rb', line 559

def master_retry
  @master_retry      || Skynet::CONFIG[:DEFAULT_MASTER_RETRY]
end

#master_taskObject



484
485
486
487
488
489
490
491
492
493
494
495
# File 'lib/skynet/skynet_job.rb', line 484

def master_task
  @master_task ||= begin    
    raise Exception.new("No map provided") unless @map

    # Make sure to set single to false in our own Job object.  
    # We're just passing along whether they set us to single.
    # If we were single, we'd never send off the master to be run externally.
    @single = false
    
    task = Skynet::Task.master_task(self)
  end
end

#mqObject



715
716
717
718
719
720
721
# File 'lib/skynet/skynet_job.rb', line 715

def mq
  if use_local_queue?
    local_mq
  else
    @mq ||= Skynet::MessageQueue.new
  end
end

#partition_data(post_map_data) ⇒ Object



350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
# File 'lib/skynet/skynet_job.rb', line 350

def partition_data(post_map_data)
  info "RUN REDUCE 3.1 BEFORE PARTITION #{display_info} reducers: #{reducers}"
  debug "RUN REDUCE 3.1 : #{reducers} #{name}, job_id:#{job_id}", post_map_data  
  printlog "RUN REDUCE 3.1 : #{reducers} #{name}, job_id:#{job_id}", post_map_data if data_debug?
  return unless post_map_data
  partitioned_data = nil
  if not @reduce_partition
    # =====================
    # = XXX HACK
    # = There was a bug in Job where the reduce_partition of master jobs wasn't being set!  This is to catch that.
    # = It handles it by checking if the map class has a reduce partitioner.  Maybe this is a good thing to leave anyway.
    # =====================
    if @map.is_a?(String) and @map.constantize.respond_to?(:reduce_partition)
      partitioned_data = @map.constantize.reduce_partition(post_map_data, reducers)
    else
      partitioned_data = Skynet::Partitioners::RecombineAndSplit.reduce_partition(post_map_data, reducers)
    end
  elsif @reduce_partition.is_a?(String)
    partitioned_data = @reduce_partition.constantize.reduce_partition(post_map_data, reducers)
  else
    partitioned_data = @reduce_partition.call(post_map_data, reducers)
  end
  partitioned_data.compact! if partitioned_data
  info "RUN REDUCE 3.2 AFTER PARTITION #{display_info} reducers: #{reducers}"
  debug "RUN REDUCE 3.2 AFTER PARTITION  #{display_info} data:", partitioned_data if partitioned_data
  printlog "RUN REDUCE 3.2 AFTER PARTITION  #{display_info} data:", partitioned_data if data_debug?
  partitioned_data
end

#reduce=(reduce) ⇒ Object



681
682
683
684
685
686
687
688
689
690
691
# File 'lib/skynet/skynet_job.rb', line 681

def reduce=(reduce)
  reset!
  return unless reduce
  if reduce.class == String or reduce.class == Class
    @reduce = reduce.to_s
  elsif reduce.is_a?(Proc)
    @reduce = reduce        
  else
    raise BadMapOrReduceError.new("#{self.class}.reduce accepts a class name or a proc. Got #{reduce}") 
  end      
end

#reduce_enqueue(partitioned_data) ⇒ Object



379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
# File 'lib/skynet/skynet_job.rb', line 379

def reduce_enqueue(partitioned_data)    
  return partitioned_data unless @reduce and reducers and reducers > 0
  debug "RUN REDUCE 3.3 CREATED REDUCE TASKS #{display_info}", partitioned_data
  size = partitioned_data.size
  printlog "REDUCE MESSAGES TO ENQUEUE #{size}" if data_debug?
  
  reduce_tasks = self.reduce_tasks(partitioned_data)
  self.use_local_queue = reduce_local?(reduce_tasks)
  number_of_tasks = 0
  reduce_tasks.each_with_index do |task,ii|
    printlog "#{size - ii} REDUCE TASKS LEFT TO ENQUEUE" if data_debug?
    number_of_tasks += 1
    enqueue_messages(tasks_to_messages(task))
  end            
  return number_of_tasks
end

#reduce_local?(reduce_tasks) ⇒ Boolean

Returns:

  • (Boolean)


587
588
589
590
591
592
# File 'lib/skynet/skynet_job.rb', line 587

def reduce_local?(reduce_tasks)
  return true if solo? or single?
  return true if keep_reduce_tasks == true
  return true if keep_reduce_tasks and reduce_tasks.data.is_a?(Array) and reduce_tasks.data.size <= keep_reduce_tasks
  return false
end

#reduce_results(number_of_tasks) ⇒ Object



396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
# File 'lib/skynet/skynet_job.rb', line 396

def reduce_results(number_of_tasks)
  results = gather_results(number_of_tasks, reduce_timeout, reduce_name)
  printlog "REDUCE RESULTS", results if data_debug?
  if results.is_a?(Array) and results.first.is_a?(Array)
    final = []
    results.each do |result|
      final += result
    end
    results = final
  end
  debug "RUN REDUCE 3.4 AFTER REDUCE #{display_info} results size: #{results ? results.size : ''}"
  debug "RUN REDUCE 3.4 AFTER REDUCE #{display_info} results:", results if results
  printlog "POST REDUCE RESULTS", results if data_debug?
  return results
end

#reduce_retryObject



567
568
569
# File 'lib/skynet/skynet_job.rb', line 567

def reduce_retry
  @reduce_retry      || Skynet::CONFIG[:DEFAULT_REDUCE_RETRY]
end

#reduce_tasks(partitioned_data) ⇒ Object



534
535
536
537
538
539
540
541
542
543
544
545
# File 'lib/skynet/skynet_job.rb', line 534

def reduce_tasks(partitioned_data)
  @reduce_tasks ||= begin
    task_options = {
      :name           => reduce_name,
      :process        => @reduce,
      :map_or_reduce  => :reduce,
      :result_timeout => reduce_timeout,
      :retry          => reduce_retry || Skynet::CONFIG[:DEFAULT_REDUCE_RETRY]
    }
    Skynet::TaskIterator.new(task_options, partitioned_data)
  end
end

#reset!Object



624
625
626
627
# File 'lib/skynet/skynet_job.rb', line 624

def reset!
  @map_tasks    = nil
  @reduce_tasks = nil
end

#resultsObject

Returns the final results of this map/reduce job. If results is called on an :async job calling results will block until results are found or the master_timeout is reached.



313
314
315
316
317
318
# File 'lib/skynet/skynet_job.rb', line 313

def results
  # ============================================
  # = FIXME Maybe this can have better warnings if the results aren't ready yet. =
  # ============================================
  master_results
end

#run(options = {}) ⇒ Object

Options are: :local_master BOOL (DEFAULT true)

By default, your Skynet::Job will act as the master for your map/reduce job, doling out
tasks, waiting for other workers to complete and return their results and dealing with
merging and partitioning the data.   If you run in async mode, another worker will handle 
being the master for your job without blocking.  If you run :async => false, :local_master => false
Skynet will let another worker be the master for your job, but will block waiting for the 
final results.  The benefit of this is that if your process dies, the Job will continue to
run remotely.

:async BOOL (DEFAULT false)

If you run in async mode, another worker will handle being the master for your job without blocking.
You can not pass :local_master => true, :async => true since the only way to allow your
job to run asyncronously is to have a remote_master.

You can pass any options you might pass to Skynet::Job.new.  Warning: Passing options to run
will permanently change properties of the job.

Raises:



275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
# File 'lib/skynet/skynet_job.rb', line 275

def run(options = {})
  FIELDS.each do |field|
    if options.has_key?(field)
      self.send("#{field}=".to_sym,options[field])
    end
  end
  raise Error.new("You can not run a local master in async mode.") if self.async and self.local_master
  
  info "RUN 1 BEGIN #{name}, job_id:#{job_id} vers: #{version} async:#{async}, local_master: #{local_master}, master?: #{master?}"
  
  # run the master task if we're running async or local_master
  if master?
    master_enqueue
    # ====================================================================================
    # = FIXME  If async Return a handle to an object that can used to retrieve the results later.
    # ====================================================================================
    async? ? job_id : master_results
  else
    number_of_tasks_queued = self.map_enqueue
    map_results            = self.map_results(number_of_tasks_queued)
    return map_results unless map_results and self.reduce

    partitioned_data       = self.partition_data(map_results)
    return unless partitioned_data
    number_of_tasks_queued = self.reduce_enqueue(partitioned_data)
    
    @results = self.reduce_results(number_of_tasks_queued)
  end
end

#run_masterObject



710
711
712
713
# File 'lib/skynet/skynet_job.rb', line 710

def run_master
  error "run_master has been deprecated, please use run"
  run(:local_master => false)
end

#single?Boolean

Returns:

  • (Boolean)


616
617
618
# File 'lib/skynet/skynet_job.rb', line 616

def single?
  @single
end

#solo?Boolean

Returns:

  • (Boolean)


612
613
614
# File 'lib/skynet/skynet_job.rb', line 612

def solo?
  (@solo or CONFIG[:SOLO])
end

#start_after=(time) ⇒ Object



660
661
662
# File 'lib/skynet/skynet_job.rb', line 660

def start_after=(time)
  @start_after = (time.is_a?(Time) ? time.to_i : time)
end

#task_idObject



640
641
642
# File 'lib/skynet/skynet_job.rb', line 640

def task_id
  @task_id ||= get_unique_id(1).to_i
end

#tasks_to_messages(tasks) ⇒ Object



547
548
549
550
551
552
553
554
555
556
557
# File 'lib/skynet/skynet_job.rb', line 547

def tasks_to_messages(tasks)
   if tasks.is_a?(Skynet::TaskIterator)
     tasks = tasks.to_a
   elsif not tasks.is_a?(Array)
     tasks = [tasks]
   end

   tasks.collect do |task|
     Skynet::Message.new_task_message(task,self)
   end
end

#to_hObject



629
630
631
632
633
634
635
636
637
638
# File 'lib/skynet/skynet_job.rb', line 629

def to_h
  if @map.kind_of?(Proc) or @reduce.kind_of?(Proc)
    raise Skynet::Error.new("You have a Proc in your map or reduce. This can't be turned into a hash.")
  end
  hash = {}
  FIELDS.each do |field|      
    hash[field] = self.send(field) if self.send(field)
  end       
  hash
end

#use_local_queue?Boolean

Returns:

  • (Boolean)


594
595
596
# File 'lib/skynet/skynet_job.rb', line 594

def use_local_queue?
  @use_local_queue
end

#versionObject



644
645
646
647
648
649
650
# File 'lib/skynet/skynet_job.rb', line 644

def version
  return 1 if solo?
  @version ||= begin
    @@worker_version ||= self.mq.get_worker_version || 1
    @@worker_version
  end
end

#version=(v) ⇒ Object



652
653
654
# File 'lib/skynet/skynet_job.rb', line 652

def version=(v)
  @version = v
end