Class: SparkToolkit::Spark::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/spark_toolkit/spark/client.rb

Instance Method Summary collapse

Constructor Details

#initialize(hconf) ⇒ Client

Returns a new instance of Client.



7
8
9
10
11
12
# File 'lib/spark_toolkit/spark/client.rb', line 7

def initialize(hconf)
  @hconf = hconf
  UserGroupInformation.set_configuration(@hconf)
  @sconf = org.apache.spark.SparkConf.new
  @sconf.set_spark_home(ENV['SPARK_HOME']) if ENV['SPARK_HOME']
end

Instance Method Details

#active_kerberosObject



66
67
68
69
70
71
72
73
74
75
# File 'lib/spark_toolkit/spark/client.rb', line 66

def active_kerberos
  prepare_yarn_propreties

  @sconf.set("spark.hadoop.hadoop.security.authentication", "kerberos")
  @sconf.set("spark.hadoop.hadoop.security.authorization", "true")

  UserGroupInformation.set_configuration(SparkHadoopUtil.get.newConfiguration(@sconf))
  credentials = UserGroupInformation.getLoginUser.getCredentials
  SparkHadoopUtil.get.addCurrentUserCredentials(credentials)
end

#get_spark_confObject



14
15
16
# File 'lib/spark_toolkit/spark/client.rb', line 14

def get_spark_conf
  @sconf
end

#is_python_job(t) ⇒ Object



47
48
49
50
51
52
53
# File 'lib/spark_toolkit/spark/client.rb', line 47

def is_python_job t
  if t
    @sconf.set('spark.yarn.isPython', 'true')
  else
    @sconf.set('spark.yarn.isPython', 'false')
  end
end

#set_app_name(s) ⇒ Object



18
19
20
# File 'lib/spark_toolkit/spark/client.rb', line 18

def set_app_name s
  @sconf.set_app_name s
end

#yarn_deploy_mode(mode) ⇒ Object



55
56
57
58
59
60
61
62
63
64
# File 'lib/spark_toolkit/spark/client.rb', line 55

def yarn_deploy_mode mode
  case mode
  when :cluster
    @sconf.set('spark.submit.deployMode', 'cluster')
  when :client
    @sconf.set('spark.submit.deployMode', 'client')
  else
    fail "Unsupported deploy mode!"
  end
end

#yarn_run(args) ⇒ Object



36
37
38
39
40
41
42
43
44
45
# File 'lib/spark_toolkit/spark/client.rb', line 36

def yarn_run(args)
  prepare_yarn_propreties
  begin
    cli_args = org.apache.spark.deploy.yarn.ClientArguments.new(args)
  rescue ArgumentError # Spark 1.x
    cli_args = org.apache.spark.deploy.yarn.ClientArguments.new(args, @sconf)
  end
  client = org.apache.spark.deploy.yarn.Client.new(cli_args, @hconf, @sconf)
  client.run
end

#yarn_submit(args) ⇒ Object

Returns

<~AppID>



25
26
27
28
29
30
31
32
33
34
# File 'lib/spark_toolkit/spark/client.rb', line 25

def yarn_submit(args)
  prepare_yarn_propreties
  begin
    cli_args = org.apache.spark.deploy.yarn.ClientArguments.new(args)
  rescue ArgumentError # Spark 1.x
    cli_args = org.apache.spark.deploy.yarn.ClientArguments.new(args, @sconf)
  end
  client = org.apache.spark.deploy.yarn.Client.new(cli_args, @hconf, @sconf)
  client.submit_application
end