Class: Skynet::Job

Inherits:
Object
  • Object
show all
Includes:
GuidGenerator, SkynetDebugger
Defined in:
lib/skynet/skynet_job.rb,
lib/skynet/skynet_tuplespace_server.rb

Direct Known Subclasses

AsyncJob

Defined Under Namespace

Classes: BadMapOrReduceError, Error, WorkerError

Constant Summary collapse

FIELDS =
[:map_tasks, :reduce_tasks, :silent, :name, :map_timeout, :map_data, :job_id,
  :reduce_timeout, :master_timeout, :master, :map_name, :reduce_name, :async,
  :master_result_timeout, :result_timeout, :start_after, :solo, :single,
  :map, :map_partitioner, :reduce, :reduce_partitioner, :single
]
@@svn_rev =
nil
@@worker_ver =
nil
@@log =
nil

Class Method Summary collapse

Instance Method Summary collapse

Methods included from GuidGenerator

#get_unique_id

Methods included from SkynetDebugger

#args_pp, #debug, #debug_header, #error, #fatal, included, #info, #log, #warn

Constructor Details

#initialize(opts = {}) ⇒ Job

Returns a new instance of Job.



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/skynet/skynet_job.rb', line 50

def initialize(opts = {})                            
  @name                  = opts[:name]                  
  @map_name              = opts[:map_name]              
  @reduce_name           = opts[:reduce_name]           
  @silent                = opts[:silent]                
  @master                = opts[:master]                
  @async                 = opts[:async]                 
  @solo                  = opts[:solo]                  
  @single                = opts[:single]                
  @version               = opts[:version]               if opts[:version]
  @map_tasks             = opts[:map_tasks]             || 2               
  @reduce_tasks          = opts[:reduce_tasks]          || 1               
  @map_timeout           = opts[:map_timeout]           || 60
  @reduce_timeout        = opts[:reduce_timeout]        || 60
  @master_timeout        = opts[:master_timeout]        || 60
  @result_timeout        = opts[:result_timeout]        || 1200
  @start_after           = opts[:start_after]           || 0               
  @master_result_timeout = opts[:master_result_timeout] || 1200
              
  @map_data = opts[:map_data]
  if opts[:map_reduce_class]
    self.map_reduce_class = opts[:map_reduce_class]
  else
    self.map    = opts[:map] if opts[:map]
    self.reduce = opts[:reduce] if opts[:reduce]
  end

  @job_id = task_id
end

Class Method Details

.debug_class_descObject



46
47
48
# File 'lib/skynet/skynet_job.rb', line 46

def self.debug_class_desc
  "JOB"
end

Instance Method Details

#display_infoObject



116
117
118
# File 'lib/skynet/skynet_job.rb', line 116

def display_info
  "#{name}, job_id: #{job_id}"
end

#increment_worker_versionObject



120
121
122
123
124
# File 'lib/skynet/skynet_job.rb', line 120

def increment_worker_version
  newver = mq.get_worker_version + 1
  mq.set_worker_version(newver)
  newver
end

#map_reduce_class=(klass) ⇒ Object



204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/skynet/skynet_job.rb', line 204

def map_reduce_class=(klass)
  unless klass.class == String or klass.class == Class
    raise BadMapOrReduceError.new("#{self.class}.map_reduce only accepts a class name") 
  end
  klass = klass.to_s
  @map = klass
  self.name     ||= "#{klass} MASTER"
  self.map_name ||= "#{klass} MAP"
  if klass.constantize.respond_to?(:reduce)
    @reduce = klass 
    self.reduce_name ||= "#{klass} REDUCE"
  end
  @reduce_partitioner = klass if klass.constantize.respond_to?(:reduce_partitioner)
  @map_partitioner    = klass if klass.constantize.respond_to?(:map_partitioner)
end

#master_taskObject

def run_master

  result =  run_tasks(master_task,master_timeout,name)
  debug "MASTER RESULT #{self.name} job_id: #{self.job_id}", result
  result
end


230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
# File 'lib/skynet/skynet_job.rb', line 230

def master_task
  @master_task ||= begin    
    raise Exception.new("No map provided") unless @map
    set_version

    job = Skynet::Job.new(
      :map_timeout        => map_timeout,
      :reduce_timeout     => reduce_timeout,
      :job_id             => :task_id,
      :map_data           => @map_data,
      :map_name           => map_name || name,
      :reduce_name        => reduce_name || name,
      :map                => @map,
      :map_partitioner    => @map_partitioner,
      :reduce             => @reduce,
      :reduce_partitioner => @reduce_partitioner,
      :map_tasks          => @map_tasks,
      :reduce_tasks       => @reduce_tasks,
      :name               => @name,
      :version            => version,
      :process            => lambda do |data|
        debug "RUNNING MASTER RUN #{name}, job_id:#{job_id}"
        job.run
      end
    )

    task = Skynet::Task.new(
      :task_id        => task_id, 
      :data           => :master, 
      :process        => process, 
      :map_or_reduce  => :master,
      :name           => self.name,
      :result_timeout => master_result_timeout
    )
  end
end

#mqObject



92
93
94
# File 'lib/skynet/skynet_job.rb', line 92

def mq
  @mq ||= Skynet::MessageQueue.new
end

#runObject

Run the job and return result arrays



268
269
270
# File 'lib/skynet/skynet_job.rb', line 268

def run
  run_job
end

#run_jobObject

Raises:

  • (ArgumentError)


272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
# File 'lib/skynet/skynet_job.rb', line 272

def run_job
  debug "RUN 1 BEGIN #{name}, job_id:#{job_id}"
  set_version
  # unless (@map && @reduce)
  raise ArgumentError, "map lambdas not assigned" unless (@map)

  # sometimes people want to run a master with just run.  In this case we assume we have to set the data to the map_data
  # XXX seems like a hack

  debug "RUN 2 MAP pre run_map #{name}, job_id:#{job_id}"

  post_map_data = run_map
  debug "RUN 3 REDUCE pre run_reduce #{name}, job_id:#{job_id}"
  return post_map_data unless post_map_data
  results = run_reduce(post_map_data)
  debug "RUN 4 FINISHED run_job #{name}, job_id:#{job_id}"
  results
end

#run_mapObject

Partition up starting data, create map tasks



292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
# File 'lib/skynet/skynet_job.rb', line 292

def run_map
  map_tasks = Array.new
  debug "RUN MAP 2.1 #{display_info} data size before partition: #{@map_data.size}"
  debug "RUN MAP 2.1 #{display_info} data before partition:", @map_data
  if @map_data.class == Array
    debug "RUN MAP 2.2 DATA IS Array #{display_info}"
    num_mappers = @map_data.length < @map_tasks ? @map_data.length : @map_tasks
    pre_map_data = Array.new
    if @map_partitioner
      pre_map_data = @map_partitioner.call(@map_data,num_mappers)
    else
      pre_map_data = Partitioner::simple_partition_data(@map_data, num_mappers)
    end
    debug "RUN MAP 2.3 #{display_info} data size after partition: #{pre_map_data.size}"
    debug "RUN MAP 2.3 #{display_info} map data after partition:", pre_map_data
    map_tasks = Array.new

    (0..num_mappers - 1).each do |i|
      map_tasks << Skynet::Task.new(
        :task_id        => get_unique_id(1), 
        :data           => pre_map_data[i], 
        :process        => @map,
        :name           => map_name,
        :map_or_reduce  => :map,
        :result_timeout => result_timeout
      )
    end

    # Run map tasks
    #
  elsif @map_data.is_a?(Enumerable)
    debug "RUN MAP 2.2 DATA IS ENUMERABLE #{display_info} map_data_class: #{@map_data.class}"
    each_method = @map_data.respond_to?(:next) ? :next : :each
    @map_data.send(each_method) do |pre_map_data|
      map_tasks << Skynet::Task.new(
        :task_id        => get_unique_id(1), 
        :data           => pre_map_data, 
        :process        => @map,
        :name           => map_name,
        :map_or_reduce  => :map,
        :result_timeout => result_timeout
      )
    end
  else
    debug "RUN MAP 2.2 DATA IS NOT ARRAY OR ENUMERABLE #{display_info} map_data_class: #{@map_data.class}"
    map_tasks = [ 
      Skynet::Task.new(
        :task_id        => get_unique_id(1), 
        :data           => @map_data, 
        :process        => @map, 
        :name           => map_name,
        :map_or_reduce  => :map, 
        :result_timeout => result_timeout
      ) 
    ]
  end      

  begin
    post_map_data = run_tasks(map_tasks,map_timeout,map_name)
  rescue WorkerError => e
    error "MAP FAILED #{display_info} #{e.class} #{e.message.inspect}"
    return nil
  end

  debug "RUN MAP 2.5 RESULTS AFTER RUN #{display_info} results:", post_map_data.inspect

  return nil unless post_map_data

  post_map_data.compact! if post_map_data.class == Array

  return post_map_data
end

#run_reduce(post_map_data = nil) ⇒ Object

Re-partition returning data for reduction, create reduce tasks



366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
# File 'lib/skynet/skynet_job.rb', line 366

def run_reduce(post_map_data=nil)    
  return post_map_data unless post_map_data and @reduce

  num_reducers = @reduce_tasks

  debug "RUN REDUCE 3.1 BEFORE PARTITION #{display_info} num_reducers: #{num_reducers}"
  # debug "RUN REDUCE 3.1 : #{num_reducers} #{name}, job_id:#{job_id}", post_map_data
  
  reduce_data = run_reduce_partitioner(post_map_data, num_reducers)
  reduce_data.compact!
  debug "RUN REDUCE 3.2 AFTER PARTITION #{display_info} num_reducers: #{reduce_data.length}"
  debug "RUN REDUCE 3.2 AFTER PARTITION  #{display_info} data:", reduce_data
  reduce_tasks = Array.new

  (0..reduce_data.length - 1).each do |i|
    reduce_tasks << Skynet::Task.new(
      :task_id        => get_unique_id(1), 
      :data           => reduce_data[i], 
      :name           => reduce_name,
      :process        => @reduce,
      :map_or_reduce  => :reduce,
      :result_timeout => result_timeout
    )
  end
  reduce_tasks.compact! if reduce_tasks      

  debug "RUN REDUCE 3.3 CREATED REDUCE TASKS #{display_info}"#, reduce_tasks
  
  # Reduce and return results
  #
  begin
    results = run_tasks(reduce_tasks, reduce_timeout,reduce_name)
  rescue WorkerError => e
    error "REDUCE FAILED #{display_info} #{e.class} #{e.message.inspect}"
    return nil
  end

  if results.class == Array and results.first.class == Hash
    hash_results = Hash.new
    results.each {|h| hash_results.merge!(h) if h.class == Hash}
    # results.flatten! if results
    results = hash_results
  end
  debug "RUN REDUCE 3.4 AFTER REDUCE #{display_info} results size: #{results.size}"
  debug "RUN REDUCE 3.4 AFTER REDUCE #{display_info} results:", results
  return results
end

#run_reduce_partitioner(post_map_data, num_reducers) ⇒ Object



414
415
416
417
418
419
420
421
422
# File 'lib/skynet/skynet_job.rb', line 414

def run_reduce_partitioner(post_map_data,num_reducers)
  if not @reduce_partitioner
    Partitioner::recombine_and_split.call(post_map_data, num_reducers) 
  elsif @reduce_partitioner.class == String
    @reduce_partitioner.constantize.reduce_partitioner(post_map_data, num_reducers)
  else
    @reduce_partitioner.call(post_map_data, num_reducers)
  end
end

#run_tasks(tasks, timeout = 5, description = "Generic Task") ⇒ Object



134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/skynet/skynet_job.rb', line 134

def run_tasks(tasks,timeout = 5,description = "Generic Task")
  result = Hash.new
  errors = Hash.new
  mq     = Skynet::MessageQueue.new unless solo?
  t1     = Time.now
  tasks  = [tasks] unless tasks.class == Array
  info "RUN TASKS #{description} ver: #{self.version} jobid: #{job_id} @ #{t1}"

  # write tasks to the MessageQueue
  task_ids = []
  tasks.each do |task|
    debug "RUN TASKS SUBMITTING #{description} task #{task.task_id} job_id: #{job_id}"        
    if solo? or single?
      result[task.task_id] = task.run
    else        
      task_ids << task.task_id      
      worker_message = Skynet::Message.new(
        :tasktype     => :task, 
        :job_id       => job_id,
        :task_id      => task.task_id,
        :payload      => task,
        :payload_type => task.task_or_master,
        :expiry       => timeout, 
        :expire_time  => @start_after,
        :iteration    => 0,
        :name         => description,       
        :version      => @version
      )
      debug "RUN TASKS WORKER MESSAGE #{description} job_id: #{job_id}", worker_message.to_a
      mq.write_message(worker_message,timeout * 5)
    end
  end

  return result.values if solo? or single?
  return true if async

  debug "GATHER RESULTS for #{description} job_id: #{job_id} - NOT AN ASYNC JOB"

  # retrieve results unless async
  begin
    loop do
      # debug "LOOKING FOR RESULT MESSAGE TEMPLATE"
      result_message = mq.take_result(job_id,timeout * 2)

      ret_result = result_message.payload
      if result_message.payload_type == :error
        errors[result_message.task_id] = ret_result
        error "ERROR RESULT TASK #{result_message.task_id} returned #{errors[result_message.task_id].inspect}"
      else
        result[result_message.task_id] = ret_result
        debug "RESULT returned TASKID: #{result_message.task_id} #{result[result_message.task_id].inspect}"
      end          
      debug "RESULT collected: #{(result.keys + errors.keys).size}, remaining: #{(task_ids - (result.keys + errors.keys)).size}"
      break if (task_ids - (result.keys + errors.keys)).empty?
      if (task_ids - (result.keys & errors.keys)).empty?
        raise Skynet::Job::Error.new("WORKER ERROR #{description}, job_id: #{job_id} errors:#{errors.keys.size} out of #{task_ids.size} workers. #{errors.pretty_print_inspect}")
      end
    end
  rescue Skynet::RequestExpiredError => e 
    error "A WORKER EXPIRED or ERRORED, #{description}, job_id: #{job_id}"
    if not errors.empty?
      raise WorkerError.new("WORKER ERROR #{description}, job_id: #{job_id} errors:#{errors.keys.size} out of #{task_ids.size} workers. #{errors.pretty_print_inspect}")
    else
      raise Skynet::RequestExpiredError.new("WORKER ERROR, A WORKER EXPIRED!  Did not get results or even errors back from all workers!")
    end
  end
  info "RUN TASKS COMPLETE #{description} jobid: #{job_id} TOOK: #{Time.now - t1}"
  result.values      
end

#set_versionObject

set_version was supposed to know when to upgrade the version. Haven’t figured out how to do this yet



97
98
99
100
101
102
103
104
# File 'lib/skynet/skynet_job.rb', line 97

def set_version
  true
  # return 1 if solo?
  # oldver = mq.get_worker_version || 0
  # if oldver != self.version
  #   mq.set_worker_version(self.version) 
  # end
end

#single?Boolean

Returns:

  • (Boolean)


130
131
132
# File 'lib/skynet/skynet_job.rb', line 130

def single?
  @single
end

#solo?Boolean

Returns:

  • (Boolean)


126
127
128
# File 'lib/skynet/skynet_job.rb', line 126

def solo?
  (@solo or CONFIG[:SOLO])
end

#task_idObject



220
221
222
# File 'lib/skynet/skynet_job.rb', line 220

def task_id
  @task_id ||= get_unique_id(1)
end

#to_hObject



80
81
82
83
84
85
86
87
88
89
90
# File 'lib/skynet/skynet_job.rb', line 80

def to_h
  if @map.kind_of?(Proc) or @reduce.kind_of?(Proc)
    raise Skynet::Error.new("You have a Proc in your map or reduce. This can't be turned into a hash.")
  end
  hash = {}
  FIELDS.each do |field|      
    next unless self.send(field)
    hash[field] = self.send(field)
  end
  hash
end

#versionObject



106
107
108
109
110
# File 'lib/skynet/skynet_job.rb', line 106

def version
  return 1 if solo?
  @@worker_version ||= mq.get_worker_version
  @version ||= @@worker_version
end

#version=(v) ⇒ Object



112
113
114
# File 'lib/skynet/skynet_job.rb', line 112

def version=(v)
  @version = v
end