Class: Mortar::Command::Spark

Inherits:
Base
  • Object
show all
Includes:
Git
Defined in:
lib/mortar/command/spark.rb

Overview

run spark jobs

Instance Attribute Summary

Attributes inherited from Base

#args, #options

Instance Method Summary collapse

Methods inherited from Base

#api, #ask_public, #config_parameters, #get_error_message_context, #git, #initialize, #initialize_embedded_project, #luigi_parameters, namespace, #pig_parameters, #project, #register_api_call, #register_do, #register_project, #spark_script_arguments, #validate_project_name, #validate_project_structure

Methods included from Helpers

#action, #ask, #confirm, #copy_if_not_present_at_dest, #default_host, #deprecate, #display, #display_header, #display_object, #display_row, #display_table, #display_with_indent, #download_to_file, #ensure_dir_exists, #error, error_with_failure, error_with_failure=, extended, extended_into, #format_bytes, #format_date, #format_with_bang, #full_host, #get_terminal_environment, #home_directory, #host, #hprint, #hputs, included, included_into, #installed_with_omnibus?, #json_decode, #json_encode, #line_formatter, #longest, #output_with_bang, #pending_github_team_state_message, #quantify, #redisplay, #retry_on_exception, #running_on_a_mac?, #running_on_windows?, #set_buffer, #shell, #spinner, #status, #string_distance, #styled_array, #styled_error, #styled_hash, #styled_header, #suggestion, #test_name, #ticking, #time_ago, #truncate, #warning, #with_tty, #write_to_file

Constructor Details

This class inherits a constructor from Mortar::Command::Base

Instance Method Details

#indexObject

spark SCRIPT

Run a spark job.

-c, –clusterid CLUSTERID # Run job on an existing cluster with ID of CLUSTERID (optional) -s, –clustersize NUMNODES # Run job on a new cluster, with NUMNODES nodes (optional; must be >= 2 if provided) -1, –singlejobcluster # Stop the cluster after job completes. (Default: false–cluster can be used for other jobs, and will shut down after 1 hour of inactivity) -2, –permanentcluster # Don’t automatically stop the cluster after it has been idle for an hour (Default: false–cluster will be shut down after 1 hour of inactivity) -3, –spot # Use spot instances for this cluster (Default: false, only applicable to new clusters) -P, –project PROJECTNAME # Use a project that is not checked out in the current directory. Runs code from project’s master branch in GitHub rather than snapshotting local code. -B, –branch BRANCHNAME # Used with –project to specify a non-master branch

Examples:

Run the classify_text sparkscript:
    $ mortar spark sparkscripts/classify_text.py

Run the classify_text sparkscript with 3 script arguments (input location, output location, tuning parameter):
    $ mortar spark sparkscripts/classify_text.py s3://your-bucket/input s3://your-bucket/output 100


46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/mortar/command/spark.rb', line 46

def index
  script_name = shift_argument
  unless script_name
    error("Usage: mortar spark SCRIPT\nMust specify SCRIPT.")
  end
  
  if options[:project]
    project_name = options[:project]
  else
    project_name = project.name
    script = validate_sparkscript!(script_name)
    script_name = script.name
  end

  script_arguments = spark_script_arguments()

  if options[:clusterid]
    [:clustersize, :singlejobcluster, :permanentcluster].each do |opt|
      unless options[opt].nil?
        error("Option #{opt.to_s} cannot be set when running a job on an existing cluster (with --clusterid option)")
      end
    end
  end

  if options[:project]
    if options[:branch]
      git_ref = options[:branch]
    else
      git_ref = "master"
    end
  else
    git_ref = sync_code_with_cloud()
  end

  unless options[:clusterid] || options[:clustersize]
    clusters = api.get_clusters(Mortar::API::Jobs::CLUSTER_BACKEND__EMR_HADOOP_2).body['clusters']

    largest_free_cluster = clusters.select{ |c| \
      c['running_jobs'].length == 0 && c['status_code'] == Mortar::API::Clusters::STATUS_RUNNING }.
      max_by{|c| c['size']}

    if largest_free_cluster.nil?
      options[:clustersize] = 2
      display("Defaulting to running job on new cluster of size 2")
    else
      options[:clusterid] = largest_free_cluster['cluster_id']
      display("Defaulting to running job on largest existing free cluster, id = " + 
              largest_free_cluster['cluster_id'] + ", size = " + largest_free_cluster['size'].to_s)
    end
  end

  response = action("Requesting job execution") do
    if options[:clustersize]
      if options[:singlejobcluster] && options[:permanentcluster]
        error("Cannot declare cluster as both --singlejobcluster and --permanentcluster")
      end
      cluster_size = options[:clustersize].to_i
      cluster_type = Mortar::API::Jobs::CLUSTER_TYPE__PERSISTENT
      if options[:singlejobcluster]
        cluster_type = Mortar::API::Jobs::CLUSTER_TYPE__SINGLE_JOB
      elsif options[:permanentcluster]
        cluster_type = Mortar::API::Jobs::CLUSTER_TYPE__PERMANENT
      end
      use_spot_instances = options[:spot] || false
      api.post_spark_job_new_cluster(project_name, script_name, git_ref, cluster_size, 
        :project_script_path => script.rel_path,
        :script_arguments => script_arguments,
        :cluster_type => cluster_type,
        :use_spot_instances => use_spot_instances).body
    else
      cluster_id = options[:clusterid]
      api.post_spark_job_existing_cluster(project_name, script_name, git_ref, cluster_id,
        :project_script_path => script.rel_path,
        :script_arguments => script_arguments).body
    end
  end
  
  display("job_id: #{response['job_id']}")
  display
  display("Job status can be viewed on the web at:\n\n #{response['web_job_url']}")
  display

  response['job_id']
end