Class: SparkToolkit::Spark::Client
- Inherits:
-
Object
- Object
- SparkToolkit::Spark::Client
- Defined in:
- lib/spark_toolkit/spark/client.rb
Instance Method Summary collapse
- #active_kerberos ⇒ Object
- #get_spark_conf ⇒ Object
-
#initialize(hconf) ⇒ Client
constructor
A new instance of Client.
- #is_python_job(t) ⇒ Object
- #set_app_name(s) ⇒ Object
- #yarn_deploy_mode(mode) ⇒ Object
- #yarn_run(args) ⇒ Object
-
#yarn_submit(args) ⇒ Object
Returns.
Constructor Details
#initialize(hconf) ⇒ Client
Returns a new instance of Client.
7 8 9 10 11 12 |
# File 'lib/spark_toolkit/spark/client.rb', line 7 def initialize(hconf) @hconf = hconf UserGroupInformation.set_configuration(@hconf) @sconf = org.apache.spark.SparkConf.new @sconf.set_spark_home(ENV['SPARK_HOME']) if ENV['SPARK_HOME'] end |
Instance Method Details
#active_kerberos ⇒ Object
66 67 68 69 70 71 72 73 74 75 |
# File 'lib/spark_toolkit/spark/client.rb', line 66 def active_kerberos prepare_yarn_propreties @sconf.set("spark.hadoop.hadoop.security.authentication", "kerberos") @sconf.set("spark.hadoop.hadoop.security.authorization", "true") UserGroupInformation.set_configuration(SparkHadoopUtil.get.newConfiguration(@sconf)) credentials = UserGroupInformation.getLoginUser.getCredentials SparkHadoopUtil.get.addCurrentUserCredentials(credentials) end |
#get_spark_conf ⇒ Object
14 15 16 |
# File 'lib/spark_toolkit/spark/client.rb', line 14 def get_spark_conf @sconf end |
#is_python_job(t) ⇒ Object
47 48 49 50 51 52 53 |
# File 'lib/spark_toolkit/spark/client.rb', line 47 def is_python_job t if t @sconf.set('spark.yarn.isPython', 'true') else @sconf.set('spark.yarn.isPython', 'false') end end |
#set_app_name(s) ⇒ Object
18 19 20 |
# File 'lib/spark_toolkit/spark/client.rb', line 18 def set_app_name s @sconf.set_app_name s end |
#yarn_deploy_mode(mode) ⇒ Object
55 56 57 58 59 60 61 62 63 64 |
# File 'lib/spark_toolkit/spark/client.rb', line 55 def yarn_deploy_mode mode case mode when :cluster @sconf.set('spark.submit.deployMode', 'cluster') when :client @sconf.set('spark.submit.deployMode', 'client') else fail "Unsupported deploy mode!" end end |
#yarn_run(args) ⇒ Object
36 37 38 39 40 41 42 43 44 45 |
# File 'lib/spark_toolkit/spark/client.rb', line 36 def yarn_run(args) prepare_yarn_propreties begin cli_args = org.apache.spark.deploy.yarn.ClientArguments.new(args) rescue ArgumentError # Spark 1.x cli_args = org.apache.spark.deploy.yarn.ClientArguments.new(args, @sconf) end client = org.apache.spark.deploy.yarn.Client.new(cli_args, @hconf, @sconf) client.run end |
#yarn_submit(args) ⇒ Object
Returns
<~AppID>
25 26 27 28 29 30 31 32 33 34 |
# File 'lib/spark_toolkit/spark/client.rb', line 25 def yarn_submit(args) prepare_yarn_propreties begin cli_args = org.apache.spark.deploy.yarn.ClientArguments.new(args) rescue ArgumentError # Spark 1.x cli_args = org.apache.spark.deploy.yarn.ClientArguments.new(args, @sconf) end client = org.apache.spark.deploy.yarn.Client.new(cli_args, @hconf, @sconf) client.submit_application end |