当前位置: 代码迷 >> 综合 >> SparkContext
  详细解决方案

SparkContext

热度:96   发布时间:2023-12-23 18:33:18.0

1.简介:SparkContext是Spark的驱动器,她的初始化中包含任务调度器、资源管理器、事件管理器和状态跟踪器等。

 

2.构造方法

主构造方法

//传入配置参数SparkConf
class SparkContext(config: SparkConf) extends Logging 

辅助构造方法

def this() = this(new SparkConf())  //无参构造方法传入默认配置//master:运行环境,如:"local","yarn"等 appName:应用名称 conf:配置
def this(master: String, appName: String, conf: SparkConf) =   //调用主构造方法,还有几个类似辅助构造方法,调用updateConf ↓this(SparkContext.updatedConf(conf, master, appName))      //创建SparkConf的修改版本,将传参配置设置到SparkConf中
private[spark] def updatedConf(conf: SparkConf,master: String,appName: String,sparkHome: String = null,jars: Seq[String] = Nil,environment: Map[String, String] = Map()): SparkConf =  {val res = conf.clone()res.setMaster(master)res.setAppName(appName)if (sparkHome != null) {res.setSparkHome(sparkHome)}if (jars != null && !jars.isEmpty) {res.setJars(jars)}res.setExecutorEnv(environment.toSeq)res}

 

3.主要成员

//部分将在初始化中说明
private var _conf: SparkConf = _
private var _eventLogDir: Option[URI] = None
private var _eventLogCodec: Option[String] = None
private var _listenerBus: LiveListenerBus = _
private var _env: SparkEnv = _
private var _statusTracker: SparkStatusTracker = _
private var _progressBar: Option[ConsoleProgressBar] = None
private var _ui: Option[SparkUI] = None
private var _hadoopConfiguration: Configuration = _
private var _executorMemory: Int = _
private var _schedulerBackend: SchedulerBackend = _
private var _taskScheduler: TaskScheduler = _
private var _heartbeatReceiver: RpcEndpointRef = _
@volatile private var _dagScheduler: DAGScheduler = _
private var _applicationId: String = _
private var _applicationAttemptId: Option[String] = None
private var _eventLogger: Option[EventLoggingListener] = None
private var _driverLogger: Option[DriverLogger] = None
private var _executorAllocationManager: Option[ExecutorAllocationManager] = None
private var _cleaner: Option[ContextCleaner] = None
private var _listenerBusStarted: Boolean = false
private var _jars: Seq[String] = _
private var _files: Seq[String] = _
private var _shutdownHookRef: AnyRef = _
private var _statusStore: AppStatusStore = _
private var _heartbeater: Heartbeater = _
private var _resources: scala.collection.immutable.Map[String, ResourceInformation] = _

 

4.初始化

try {_conf = config.clone()  //克隆构造方法传入的配置_conf.validateSettings()  //检查非法或过时的配置设置if (!_conf.contains("spark.master")) {   //如果没有设置运行环境,抛出异常throw new SparkException("A master URL must be set in your configuration")}if (!_conf.contains("spark.app.name")) {  //如果没有设置应用名称,抛出异常throw new SparkException("An application name must be set in your configuration")}_driverLogger = DriverLogger(_conf)   //Ctrl点不进去,英文翻译为:驱动程序记录器val resourcesFileOpt = conf.get(DRIVER_RESOURCES_FILE)    //驱动程序资源文件//英文翻译为:获取或发现所有资源_resources = getOrDiscoverAllResources(_conf, SPARK_DRIVER_PREFIX, resourcesFileOpt) logInfo(s"Submitted application: $appName")//如果用户代码在YARN群集上由AM运行,则必须设置系统属性spark.yarn.app.idif (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) {throw new SparkException("Detected yarn cluster mode, but isn't running on a cluster. " +"Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.")}if (_conf.getBoolean("spark.logConf", false)) {logInfo("Spark configuration:\n" + _conf.toDebugString)}// 设置Spark驱动程序主机和端口系统属性。这显式设置了配置,而不是依赖于配置常量的默认值。 _conf.set(DRIVER_HOST_ADDRESS, _conf.get(DRIVER_HOST_ADDRESS))_conf.setIfMissing(DRIVER_PORT, 0)_conf.set(EXECUTOR_ID, SparkContext.DRIVER_IDENTIFIER)_jars = Utils.getUserJars(_conf)  //返回jars文件_files = _conf.getOption(FILES.key).map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten_eventLogDir =if (isEventLogEnabled) {val unresolvedDir = conf.get(EVENT_LOG_DIR).stripSuffix("/")//为用户输入字符串描述的文件返回格式正确的URISome(Utils.resolveURI(unresolvedDir))  } else {None}_eventLogCodec = {val compress = _conf.get(EVENT_LOG_COMPRESS)if (compress && isEventLogEnabled) {//返回给定编解码器名称的简短版本Some(CompressionCodec.getCodecName(_conf)).map(CompressionCodec.getShortName)  } else {None}}//异步将SparkListenerEvents传递给已注册的SparkListeners(_listenerBus)_listenerBus = new LiveListenerBus(_conf)  //在创建SparkEnv之前初始化应用程序状态存储和侦听器,以便它获取所有事件val appStatusSource = AppStatusSource.createSource(conf)_statusStore = AppStatusStore.createLiveStore(conf, appStatusSource)listenerBus.addToStatusQueue(_statusStore.listener.get)// 创建Spark执行环境(缓存,映射输出跟踪器等)_env = createSparkEnv(_conf, isLocal, listenerBus)SparkEnv.set(_env)//如果运行REPL,请将repl的输出目录注册到文件服务器_conf.getOption("spark.repl.class.outputDir").foreach { path =>val replUri = _env.rpcEnv.fileServer.addDirectory("/classes", new File(path))_conf.set("spark.repl.class.uri", replUri)}//用于监视作业和阶段进度的低级状态报告API_statusTracker = new SparkStatusTracker(this, _statusStore) _progressBar =if (_conf.get(UI_SHOW_CONSOLE_PROGRESS)) {//ConsoleProgressBar显示控制台下一行中的阶段进度Some(new ConsoleProgressBar(this))  } else {None}_ui =if (conf.get(UI_ENABLED)) {//SparkUI:Spark应用程序的顶级用户界面Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "", startTime))} else {//对于测试,请不要启用UINone}//在启动任务计划程序之前绑定UI,以便将绑定端口正确地传递给集群管理器_ui.foreach(_.bind())//创建配置可以初始化一些Hadoop子系统_hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf) //性能优化_hadoopConfiguration.size()//添加通过构造函数给出的每个JARif (jars != null) {jars.foreach(addJar)}if (files != null) {files.foreach(addFile)}_executorMemory = _conf.getOption(EXECUTOR_MEMORY.key)  //执行者内存.orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY"))).orElse(Option(System.getenv("SPARK_MEM")).map(warnSparkMem)).map(Utils.memoryStringToMb).getOrElse(1024)// 将java选项转换为env vars作为解决方法,因为我们无法直接在sbt中设置env varsfor { (envKey, propKey) <- Seq(("SPARK_TESTING", IS_TESTING.key))value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {executorEnvs(envKey) = value}Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v =>executorEnvs("SPARK_PREPEND_CLASSES") = v}// Mesos调度程序后端依赖此环境变量来设置执行程序内存executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"executorEnvs ++= _conf.getExecutorEnvexecutorEnvs("SPARK_USER") = sparkUser// 我们需要在“createTaskScheduler”之前注册“HeartbeatReceiver”,// 因为Executor将在构造函数中检索“HeartbeatReceiver”_heartbeatReceiver = env.rpcEnv.setupEndpoint(HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))// 创建并启动调度程序val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)_schedulerBackend = sched  //调度程序后端_taskScheduler = ts  //任务计划程序_dagScheduler = new DAGScheduler(this) //有向无环图计划程序_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)// 创建并启动heartbeater以收集内存指标_heartbeater = new Heartbeater(env.memoryManager,() => SparkContext.this.reportHeartBeat(),"driver-heartbeater",conf.get(EXECUTOR_HEARTBEAT_INTERVAL))_heartbeater.start()//taskScheduler在DAGScheduler的构造函数中设置DAGScheduler引用后启动TaskScheduler_taskScheduler.start()_applicationId = _taskScheduler.applicationId()  //获取应用Id_applicationAttemptId = _taskScheduler.applicationAttemptId()_conf.set("spark.app.id", _applicationId)if (_conf.get(UI_REVERSE_PROXY)) {//设置指定键指示的系统属性System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId) }_ui.foreach(_.setAppId(_applicationId))_env.blockManager.initialize(_applicationId)  //使用给定的appId初始化BlockManager// Driver的metrics系统需要将spark.app.id设置为app ID//所以它应该在我们从任务调度程序获取应用程序ID并设置spark.app.id之后开始_env.metricsSystem.start()// 在度量系统启动后,将驱动程序指标servlet处理程序附加到web ui_env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))_eventLogger =if (isEventLogEnabled) {val logger =//SparkListener,用于将事件记录到持久存储new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get, _conf, _hadoopConfiguration)logger.start()listenerBus.addToEventLogQueue(logger) //将侦听器添加到事件日志队列Some(logger)  //返回事件记录} else {None}_cleaner =if (_conf.get(CLEANER_REFERENCE_TRACKING)) {Some(new ContextCleaner(this))    //用于RDD,shuffle和广播状态的异步清理器} else {None}_cleaner.foreach(_.start())   //启动清理器//返回是否在给定conf中启用动态分配val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)   _executorAllocationManager =   //执行者分配管理器if (dynamicAllocationEnabled) {schedulerBackend match {case b: ExecutorAllocationClient =>Some(new ExecutorAllocationManager(schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,cleaner = cleaner))case _ =>None}} else {None}//start():注册调度程序回调以决定何时添加和删除执行程序,以及启动调度任务_executorAllocationManager.foreach(_.start())   //注册spark.extraListeners中指定的侦听器,然后启动侦听器总线setupAndStartListenerBus()   postEnvironmentUpdate()   //任务计划程序准备好后,发布环境更新事件postApplicationStart()   //发布应用程序启动事件// Post init_taskScheduler.postStartHook()   //Yarn使用它根据首选位置引导资源分配,等待奴隶注册等//注册DAG任务计划测量资源_env.metricsSystem.registerSource(_dagScheduler.metricsSource)   //注册BlockManager资源_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))  _env.metricsSystem.registerSource(new JVMCPUSource())   //注册JVM CPU资源_executorAllocationManager.foreach { e =>//注册执行者分配管理器资源_env.metricsSystem.registerSource(e.executorAllocationManagerSource)   }appStatusSource.foreach(_env.metricsSystem.registerSource(_))   // 如果用户忘记了上下文,请确保上下文已停止。 这避免了离开JVM完全退出后未完成的事件日志。// 如果JVM没有用但是被杀了。logDebug("Adding shutdown hook") //迫切渴望创建记录器//添加具有给定优先级的关闭挂钩。运行具有更高优先级值的挂钩_shutdownHookRef = ShutdownHookManager.addShutdownHook(  ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>logInfo("Invoking stop() from shutdown hook")try {stop()} catch {case e: Throwable =>logWarning("Ignoring Exception while stopping SparkContext from shutdown hook", e)}}} catch {case NonFatal(e) =>logError("Error initializing SparkContext.", e)try {stop()} catch {case NonFatal(inner) =>logError("Error stopping SparkContext after init error.", inner)} finally {throw e}}