Top Level Namespace

Defined Under Namespace

Modules: BabelHelper, CommonFunctions, ExodusHelper, NeptuneHelper Classes: BadConfigurationException, ExodusTaskInfo, FileNotFoundException, NeptuneManagerClient, NeptuneManagerException, Object, TaskInfo

Constant Summary collapse

NEEDS_BUCKET_INFO =

If the user doesn’t give us enough info to infer what bucket we should place their code in, this message is displayed and execution aborts.

"When running Babel jobs with local inputs / code, the " +
"bucket to store them in must be specified by either the :bucket_name " +
"parameter or the BABEL_BUCKET_NAME environment variable."
DOES_NOT_EXIST =

The constant string that a Neptune output job returns if the output does not yet exist.

"error: output does not exist"
SLEEP_TIME =

The initial amount of time, in seconds, to sleep between output job requests. An exponential backoff is used with this value as the starting sleep time.

5
MAX_SLEEP_TIME =

The maximum amount of time that we should sleep to, when waiting for output job requests.

60
ALLOWED_JOB_TYPES =

A list of all the Neptune job types that we support

%w{acl cicero compile erlang mpi input output ssa babel upc x10 mapreduce}
JOB_TYPE_NOT_ALLOWED =

The string to display for disallowed job types.

"The job type you specified is not supported."
NO_NODES_NEEDED =

A list of Neptune jobs that do not require nodes to be spawned up for computation

["acl", "input", "output", "compile"]
NO_OUTPUT_NEEDED =

A list of Neptune jobs that do not require the output to be specified beforehand

["input"]
ALLOWED_STORAGE_TYPES =

A list of storage mechanisms that we can use to store and retrieve data to for Neptune jobs.

%w{appdb gstorage s3 walrus waz-storage}
NEED_PREPROCESSING =

A list of jobs that require some kind of work to be done before the actual computation can be performed.

["babel", "compile", "erlang", "mpi", "ssa"]
NO_TIMEOUT =

Sometimes SOAP calls take a long time if large amounts of data are being sent over the network: for this first version we don’t want these calls to endlessly timeout and retry, so as a hack, just don’t let them timeout. The next version should replace this and properly timeout and not use long calls unless necessary.

100000

Instance Method Summary collapse

Instance Method Details

#babel(jobs) ⇒ Object

Babel provides a nice wrapper around Neptune jobs. Instead of making users write multiple Neptune jobs to actually run code (e.g., putting input in the datastore, run the job, get the output back), Babel automatically handles this.



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/babel.rb', line 44

def babel(jobs)
  # Since this whole function should run asynchronously, we run it as a future.
  # It automatically starts running in a new thread, and attempting to get the
  # value of what this returns causes it to block until the job completes.
  #future {
    if jobs.class == Hash
      was_one_job = true
      jobs = [jobs]
    else
      was_one_job = false
    end

    job_data_list = []
    jobs.each { |params|
      job_data = BabelHelper.convert_from_neptune_params(params)
      job_data['@metadata_info'] = {'time_to_store_inputs' => 0.0}

      # Add in S3 storage parameters
      NeptuneHelper.validate_storage_params(job_data)

      # :code is the only required parameter
      # everything else can use default vals
      NeptuneHelper.require_param("@code", job_data)
      BabelHelper.check_output_files(job_data)

      if job_data["@is_remote"]
        #BabelHelper.validate_inputs(job_data)
      else
        BabelHelper.put_code(job_data)
        BabelHelper.put_inputs(job_data)
      end

      job_data_list << job_data
    }

    BabelHelper.run_job(job_data_list)

    # Return an object to the user that has all the information about their
    # task, including its standard out, err, debugging info, and profiling
    # info. Since the job may not be done when the user asks for this info,
    # its the responsibility of TaskInfo objects to block until that info
    # is ready. We don't explicitly return the TaskInfo object, because it's
    # in a Future block - it will automatically return whatever the last
    # statement returns.
    tasks = []
    job_data_list.each { |job_data|
      tasks << TaskInfo.new(job_data)
    }

    if was_one_job
      tasks[0]
    else
      tasks
    end
  #}
end

#exodus(jobs) ⇒ Object

Exodus provides further improvements to Babel. Instead of making users tell us what compute, storage, and queue services they want to use (required for babel calls), Exodus will automatically handle this for us. Callers need to specify what clouds their job can run over, and Exodus will automatically select the best cloud for their job and run it there.



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/exodus.rb', line 20

def exodus(jobs)
  if jobs.class == Hash
    job_given_as_hash = true
    jobs = [jobs]
  elsif jobs.class == Array
    job_given_as_hash = false
    ExodusHelper.ensure_all_jobs_are_hashes(jobs)
  else
    raise BadConfigurationException.new("jobs was a #{jobs.class}, which " +
      "is not an acceptable class type")
  end

  tasks = []

  jobs.each { |job|
    ExodusHelper.ensure_all_params_are_present(job)
    profiling_info = ExodusHelper.get_profiling_info(job)
    clouds_to_run_task_on = ExodusHelper.get_clouds_to_run_task_on(job, 
      profiling_info)
    babel_tasks_to_run = ExodusHelper.generate_babel_tasks(job, 
      clouds_to_run_task_on)
    dispatched_tasks = ExodusHelper.run_job(babel_tasks_to_run)
    tasks << ExodusTaskInfo.new(dispatched_tasks)
  }

  if job_given_as_hash
    return tasks[0]
  else
    return tasks
  end
end

#neptune(jobs) ⇒ Object

This method is the heart of Neptune - here, we take blocks of code that the user has written and convert them into HPC job requests. At a high level, the user can request to run a job, retrieve a job’s output, or modify the access policy (ACL) for the output of a job. By default, job data is private, but a Neptune job can be used to set it to public later (and vice-versa).



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/neptune.rb', line 58

def neptune(jobs)
  # Kernel.puts "Received a request to run a job."
  # Kernel.puts params[:type]
  if jobs.class == Hash
    jobs = [jobs]
  end

  job_data_list = []
  shadow_ip = nil
  ssh_args = ""
  secret = ""
  controller = nil

  jobs.each { |params|
    job_data = NeptuneHelper.get_job_data(params)
    NeptuneHelper.validate_storage_params(job_data)
    # Kernel.puts "job data = #{job_data.inspect}"
    keyname = job_data["@keyname"]

    shadow_ip = CommonFunctions.get_from_yaml(keyname, :shadow)
    secret = CommonFunctions.get_secret_key(keyname)
    ssh_key = File.expand_path("~/.appscale/#{keyname}.key")
    ssh_args = "-i ~/.appscale/#{keyname}.key -o StrictHostkeyChecking=no "

    controller = NeptuneManagerClient.new(shadow_ip, secret)
    NeptuneHelper.do_preprocessing(job_data, controller)
    job_data_list << job_data
  }

  if job_data_list.length == 1
    return NeptuneHelper.run_job(job_data_list[0], ssh_args, shadow_ip, 
      secret)
  else  # right now we only support batch run_job operations
    msg = controller.start_neptune_job(job_data_list)
    result = {}
    result[:msg] = msg
    if result[:msg] =~ /job is now running\Z/
      result[:result] = :success
    else
      result[:result] = :failure
    end

    return result
  end
end