博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark源码研读-散篇记录(一):SparkConf
阅读量:6710 次
发布时间:2019-06-25

本文共 29722 字,大约阅读时间需要 99 分钟。

0 关于散篇记录

散篇记录就是,我自己觉得有需要记录一下以方便后来查找的内容,就记录下来。

1 Spark版本

Spark 2.1.0。

2 说明

源码过程中所涉及的许多Scala的知识,完全可以参考之前Scala的笔记文章,应该来说确实很多知识内容都涉及到了。

3 SparkConf源码

SparkConf的源码相对不难,主要是对Spark本身要有所理解,同时Scala也应该要有所掌握,那么看起来就不太复杂,只看了比较核心的方法,整体有个思路,做了一些个人的备注,有些目前还没有涉及到的用法自然就不会先去看,这里作为记录。

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements.  See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License.  You may obtain a copy of the License at * *    http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.sparkimport java.util.concurrent.ConcurrentHashMapimport scala.collection.JavaConverters._import scala.collection.mutable.LinkedHashSetimport org.apache.avro.{Schema, SchemaNormalization}import org.apache.spark.internal.Loggingimport org.apache.spark.internal.config._import org.apache.spark.serializer.KryoSerializerimport org.apache.spark.util.Utils/** * Configuration for a Spark application. Used to set various Spark parameters as key-value pairs. * * Most of the time, you would create a SparkConf object with `new SparkConf()`, which will load * values from any `spark.*` Java system properties set in your application as well. In this case, * parameters you set directly on the `SparkConf` object take priority over system properties. * * For unit tests, you can also call `new SparkConf(false)` to skip loading external settings and * get the same configuration no matter what the system properties are. * * All setter methods in this class support chaining. For example, you can write * `new SparkConf().setMaster("local").setAppName("My app")`. * Leaf Note: 之所以可以这么优雅地设置这些属性,观察这些方法,最后调用set,并且返回了代表自身对象的this * * @param loadDefaults whether to also load values from Java system properties * * @note Once a SparkConf object is passed to Spark, it is cloned and can no longer be modified * by the user. Spark does not support modifying the configuration at runtime. */class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Serializable {/** Leaf Note:  * extends ... with ...,这里继承了多个特质,物质就类似于Java中的interface  * Logging用来打日志,Serializable,序列化,分布式环境,SparkConf对象传来传去的,当然需要序列化  * 说一下Conleable,看到下面其重载了clone()方法,其实就是生成了一个配置一样的SparkConf对象  * 目的是避免多个组件共用同一个SparkConf对象时出现的并发问题,不同组件都使用,clone一个给你  * 任何地方要使用SparkConf对象,调用clone方法复制一个,十分优雅  * */  import SparkConf._  /** Create a SparkConf that loads defaults from system properties and the classpath */  /** Leaf Note: 这是一个辅助构造器,其默认也是调用主构造器,也就是类定义中需要传入参数的,它就是主构造器,这是scala的定义*/  def this() = this(true)  // Leaf Note: 线程安全的map,就是真正用来保存Spark的配置属性的  private val settings = new ConcurrentHashMap[String, String]()  @transient private lazy val reader: ConfigReader = {    val _reader = new ConfigReader(new SparkConfigProvider(settings))    _reader.bindEnv(new ConfigProvider {      override def get(key: String): Option[String] = Option(getenv(key))    })    _reader  }  if (loadDefaults) {    loadFromSystemProperties(false)  }  /* Leaf Note: private[spark]中的spark,实际为org.apache.spark包,表示该方法只能在该包下使用 */  private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = {    // Load any spark.* system properties    for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {      set(key, value, silent)    }    this  }  /** Set a configuration variable. */  def set(key: String, value: String): SparkConf = {    set(key, value, false)  }  /** Leaf Note: silent为false的话,就是不安静了,设置为false,对于一些过时的配置,那么就会给出warning */  private[spark] def set(key: String, value: String, silent: Boolean): SparkConf = {    if (key == null) {      throw new NullPointerException("null key")    }    if (value == null) {      throw new NullPointerException("null value for " + key)    }    if (!silent) {      logDeprecationWarning(key)    }    settings.put(key, value)    this  }  private[spark] def set[T](entry: ConfigEntry[T], value: T): SparkConf = {    set(entry.key, entry.stringConverter(value))    this  }  private[spark] def set[T](entry: OptionalConfigEntry[T], value: T): SparkConf = {    set(entry.key, entry.rawStringConverter(value))    this  }  /**   * The master URL to connect to, such as "local" to run locally with one thread, "local[4]" to   * run locally with 4 cores, or "spark://master:7077" to run on a Spark standalone cluster.   */  def setMaster(master: String): SparkConf = {    set("spark.master", master)  }  /** Set a name for your application. Shown in the Spark web UI. */  def setAppName(name: String): SparkConf = {    set("spark.app.name", name)  }  /** Set JAR files to distribute to the cluster. */  def setJars(jars: Seq[String]): SparkConf = {    for (jar <- jars if (jar == null)) logWarning("null jar passed to SparkContext constructor")    set("spark.jars", jars.filter(_ != null).mkString(","))  }  /** Set JAR files to distribute to the cluster. (Java-friendly version.) */  def setJars(jars: Array[String]): SparkConf = {    setJars(jars.toSeq)  }  /**   * Set an environment variable to be used when launching executors for this application.   * These variables are stored as properties of the form spark.executorEnv.VAR_NAME   * (for example spark.executorEnv.PATH) but this method makes them easier to set.   */  def setExecutorEnv(variable: String, value: String): SparkConf = {    set("spark.executorEnv." + variable, value)  }  /**   * Set multiple environment variables to be used when launching executors.   * These variables are stored as properties of the form spark.executorEnv.VAR_NAME   * (for example spark.executorEnv.PATH) but this method makes them easier to set.   */  def setExecutorEnv(variables: Seq[(String, String)]): SparkConf = {    for ((k, v) <- variables) {      setExecutorEnv(k, v)    }    this  }  /**   * Set multiple environment variables to be used when launching executors.   * (Java-friendly version.)   */  def setExecutorEnv(variables: Array[(String, String)]): SparkConf = {    setExecutorEnv(variables.toSeq)  }  /**   * Set the location where Spark is installed on worker nodes.   */  def setSparkHome(home: String): SparkConf = {    set("spark.home", home)  }  /** Set multiple parameters together */  def setAll(settings: Traversable[(String, String)]): SparkConf = {    settings.foreach { case (k, v) => set(k, v) }    this  }  /** Set a parameter if it isn't already configured */  def setIfMissing(key: String, value: String): SparkConf = {    if (settings.putIfAbsent(key, value) == null) {      logDeprecationWarning(key)    }    this  }  private[spark] def setIfMissing[T](entry: ConfigEntry[T], value: T): SparkConf = {    if (settings.putIfAbsent(entry.key, entry.stringConverter(value)) == null) {      logDeprecationWarning(entry.key)    }    this  }  private[spark] def setIfMissing[T](entry: OptionalConfigEntry[T], value: T): SparkConf = {    if (settings.putIfAbsent(entry.key, entry.rawStringConverter(value)) == null) {      logDeprecationWarning(entry.key)    }    this  }  /**   * Use Kryo serialization and register the given set of classes with Kryo.   * If called multiple times, this will append the classes from all calls together.   */  def registerKryoClasses(classes: Array[Class[_]]): SparkConf = {    val allClassNames = new LinkedHashSet[String]()    allClassNames ++= get("spark.kryo.classesToRegister", "").split(',').map(_.trim)      .filter(!_.isEmpty)    allClassNames ++= classes.map(_.getName)    set("spark.kryo.classesToRegister", allClassNames.mkString(","))    set("spark.serializer", classOf[KryoSerializer].getName)    this  }  private final val avroNamespace = "avro.schema."  /**   * Use Kryo serialization and register the given set of Avro schemas so that the generic   * record serializer can decrease network IO   */  def registerAvroSchemas(schemas: Schema*): SparkConf = {    for (schema <- schemas) {      set(avroNamespace + SchemaNormalization.parsingFingerprint64(schema), schema.toString)    }    this  }  /** Gets all the avro schemas in the configuration used in the generic Avro record serializer */  def getAvroSchema: Map[Long, String] = {    getAll.filter { case (k, v) => k.startsWith(avroNamespace) }      .map { case (k, v) => (k.substring(avroNamespace.length).toLong, v) }      .toMap  }  /** Remove a parameter from the configuration */  def remove(key: String): SparkConf = {    settings.remove(key)    this  }  private[spark] def remove(entry: ConfigEntry[_]): SparkConf = {    remove(entry.key)  }  /** Get a parameter; throws a NoSuchElementException if it's not set */  def get(key: String): String = {    getOption(key).getOrElse(throw new NoSuchElementException(key))  }  /** Get a parameter, falling back to a default if not set */  def get(key: String, defaultValue: String): String = {    getOption(key).getOrElse(defaultValue)  }  /**   * Retrieves the value of a pre-defined configuration entry.   *   * - This is an internal Spark API.   * - The return type if defined by the configuration entry.   * - This will throw an exception is the config is not optional and the value is not set.   */  private[spark] def get[T](entry: ConfigEntry[T]): T = {    entry.readFrom(reader)  }  /**   * Get a time parameter as seconds; throws a NoSuchElementException if it's not set. If no   * suffix is provided then seconds are assumed.   * @throws java.util.NoSuchElementException   */  def getTimeAsSeconds(key: String): Long = {    Utils.timeStringAsSeconds(get(key))  }  /**   * Get a time parameter as seconds, falling back to a default if not set. If no   * suffix is provided then seconds are assumed.   */  def getTimeAsSeconds(key: String, defaultValue: String): Long = {    Utils.timeStringAsSeconds(get(key, defaultValue))  }  /**   * Get a time parameter as milliseconds; throws a NoSuchElementException if it's not set. If no   * suffix is provided then milliseconds are assumed.   * @throws java.util.NoSuchElementException   */  def getTimeAsMs(key: String): Long = {    Utils.timeStringAsMs(get(key))  }  /**   * Get a time parameter as milliseconds, falling back to a default if not set. If no   * suffix is provided then milliseconds are assumed.   */  def getTimeAsMs(key: String, defaultValue: String): Long = {    Utils.timeStringAsMs(get(key, defaultValue))  }  /**   * Get a size parameter as bytes; throws a NoSuchElementException if it's not set. If no   * suffix is provided then bytes are assumed.   * @throws java.util.NoSuchElementException   */  def getSizeAsBytes(key: String): Long = {    Utils.byteStringAsBytes(get(key))  }  /**   * Get a size parameter as bytes, falling back to a default if not set. If no   * suffix is provided then bytes are assumed.   */  def getSizeAsBytes(key: String, defaultValue: String): Long = {    Utils.byteStringAsBytes(get(key, defaultValue))  }  /**   * Get a size parameter as bytes, falling back to a default if not set.   */  def getSizeAsBytes(key: String, defaultValue: Long): Long = {    Utils.byteStringAsBytes(get(key, defaultValue + "B"))  }  /**   * Get a size parameter as Kibibytes; throws a NoSuchElementException if it's not set. If no   * suffix is provided then Kibibytes are assumed.   * @throws java.util.NoSuchElementException   */  def getSizeAsKb(key: String): Long = {    Utils.byteStringAsKb(get(key))  }  /**   * Get a size parameter as Kibibytes, falling back to a default if not set. If no   * suffix is provided then Kibibytes are assumed.   */  def getSizeAsKb(key: String, defaultValue: String): Long = {    Utils.byteStringAsKb(get(key, defaultValue))  }  /**   * Get a size parameter as Mebibytes; throws a NoSuchElementException if it's not set. If no   * suffix is provided then Mebibytes are assumed.   * @throws java.util.NoSuchElementException   */  def getSizeAsMb(key: String): Long = {    Utils.byteStringAsMb(get(key))  }  /**   * Get a size parameter as Mebibytes, falling back to a default if not set. If no   * suffix is provided then Mebibytes are assumed.   */  def getSizeAsMb(key: String, defaultValue: String): Long = {    Utils.byteStringAsMb(get(key, defaultValue))  }  /**   * Get a size parameter as Gibibytes; throws a NoSuchElementException if it's not set. If no   * suffix is provided then Gibibytes are assumed.   * @throws java.util.NoSuchElementException   */  def getSizeAsGb(key: String): Long = {    Utils.byteStringAsGb(get(key))  }  /**   * Get a size parameter as Gibibytes, falling back to a default if not set. If no   * suffix is provided then Gibibytes are assumed.   */  def getSizeAsGb(key: String, defaultValue: String): Long = {    Utils.byteStringAsGb(get(key, defaultValue))  }  /** Get a parameter as an Option */  def getOption(key: String): Option[String] = {    Option(settings.get(key)).orElse(getDeprecatedConfig(key, this))  }  /** Get all parameters as a list of pairs */  def getAll: Array[(String, String)] = {    settings.entrySet().asScala.map(x => (x.getKey, x.getValue)).toArray  }  /**   * Get all parameters that start with `prefix`   */  def getAllWithPrefix(prefix: String): Array[(String, String)] = {    getAll.filter { case (k, v) => k.startsWith(prefix) }      .map { case (k, v) => (k.substring(prefix.length), v) }  }  /** Get a parameter as an integer, falling back to a default if not set */  def getInt(key: String, defaultValue: Int): Int = {    getOption(key).map(_.toInt).getOrElse(defaultValue)  }  /** Get a parameter as a long, falling back to a default if not set */  def getLong(key: String, defaultValue: Long): Long = {    getOption(key).map(_.toLong).getOrElse(defaultValue)  }  /** Get a parameter as a double, falling back to a default if not set */  def getDouble(key: String, defaultValue: Double): Double = {    getOption(key).map(_.toDouble).getOrElse(defaultValue)  }  /** Get a parameter as a boolean, falling back to a default if not set */  def getBoolean(key: String, defaultValue: Boolean): Boolean = {    getOption(key).map(_.toBoolean).getOrElse(defaultValue)  }  /** Get all executor environment variables set on this SparkConf */  def getExecutorEnv: Seq[(String, String)] = {    getAllWithPrefix("spark.executorEnv.")  }  /**   * Returns the Spark application id, valid in the Driver after TaskScheduler registration and   * from the start in the Executor.   */  def getAppId: String = get("spark.app.id")  /** Does the configuration contain a given parameter? */  def contains(key: String): Boolean = {    settings.containsKey(key) ||      configsWithAlternatives.get(key).toSeq.flatten.exists { alt => contains(alt.key) }  }  private[spark] def contains(entry: ConfigEntry[_]): Boolean = contains(entry.key)  /** Copy this object */  /** 克隆本SparkConf对象中的配置到一个新的SparkConf对象中 */  override def clone: SparkConf = {    val cloned = new SparkConf(false)    settings.entrySet().asScala.foreach { e =>      cloned.set(e.getKey(), e.getValue(), true)    }    cloned  }  /**   * By using this instead of System.getenv(), environment variables can be mocked   * in unit tests.   */  private[spark] def getenv(name: String): String = System.getenv(name)  /**   * Checks for illegal or deprecated config settings. Throws an exception for the former. Not   * idempotent - may mutate this conf object to convert deprecated settings to supported ones.   */  private[spark] def validateSettings() {    if (contains("spark.local.dir")) {      val msg = "In Spark 1.0 and later spark.local.dir will be overridden by the value set by " +        "the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN)."      logWarning(msg)    }    val executorOptsKey = "spark.executor.extraJavaOptions"    val executorClasspathKey = "spark.executor.extraClassPath"    val driverOptsKey = "spark.driver.extraJavaOptions"    val driverClassPathKey = "spark.driver.extraClassPath"    val driverLibraryPathKey = "spark.driver.extraLibraryPath"    val sparkExecutorInstances = "spark.executor.instances"    // Used by Yarn in 1.1 and before    sys.props.get("spark.driver.libraryPath").foreach { value =>      val warning =        s"""          |spark.driver.libraryPath was detected (set to '$value').          |This is deprecated in Spark 1.2+.          |          |Please instead use: $driverLibraryPathKey        """.stripMargin      logWarning(warning)    }    // Validate spark.executor.extraJavaOptions    getOption(executorOptsKey).foreach { javaOpts =>      if (javaOpts.contains("-Dspark")) {        val msg = s"$executorOptsKey is not allowed to set Spark options (was '$javaOpts'). " +          "Set them directly on a SparkConf or in a properties file when using ./bin/spark-submit."        throw new Exception(msg)      }      if (javaOpts.contains("-Xmx")) {        val msg = s"$executorOptsKey is not allowed to specify max heap memory settings " +          s"(was '$javaOpts'). Use spark.executor.memory instead."        throw new Exception(msg)      }    }    // Validate memory fractions    val deprecatedMemoryKeys = Seq(      "spark.storage.memoryFraction",      "spark.shuffle.memoryFraction",      "spark.shuffle.safetyFraction",      "spark.storage.unrollFraction",      "spark.storage.safetyFraction")    val memoryKeys = Seq(      "spark.memory.fraction",      "spark.memory.storageFraction") ++      deprecatedMemoryKeys    for (key <- memoryKeys) {      val value = getDouble(key, 0.5)      if (value > 1 || value < 0) {        throw new IllegalArgumentException(s"$key should be between 0 and 1 (was '$value').")      }    }    // Warn against deprecated memory fractions (unless legacy memory management mode is enabled)    val legacyMemoryManagementKey = "spark.memory.useLegacyMode"    val legacyMemoryManagement = getBoolean(legacyMemoryManagementKey, false)    if (!legacyMemoryManagement) {      val keyset = deprecatedMemoryKeys.toSet      val detected = settings.keys().asScala.filter(keyset.contains)      if (detected.nonEmpty) {        logWarning("Detected deprecated memory fraction settings: " +          detected.mkString("[", ", ", "]") + ". As of Spark 1.6, execution and storage " +          "memory management are unified. All memory fractions used in the old model are " +          "now deprecated and no longer read. If you wish to use the old memory management, " +          s"you may explicitly enable `$legacyMemoryManagementKey` (not recommended).")      }    }    // Check for legacy configs    sys.env.get("SPARK_JAVA_OPTS").foreach { value =>      val warning =        s"""          |SPARK_JAVA_OPTS was detected (set to '$value').          |This is deprecated in Spark 1.0+.          |          |Please instead use:          | - ./spark-submit with conf/spark-defaults.conf to set defaults for an application          | - ./spark-submit with --driver-java-options to set -X options for a driver          | - spark.executor.extraJavaOptions to set -X options for executors          | - SPARK_DAEMON_JAVA_OPTS to set java options for standalone daemons (master or worker)        """.stripMargin      logWarning(warning)      for (key <- Seq(executorOptsKey, driverOptsKey)) {        if (getOption(key).isDefined) {          throw new SparkException(s"Found both $key and SPARK_JAVA_OPTS. Use only the former.")        } else {          logWarning(s"Setting '$key' to '$value' as a work-around.")          set(key, value)        }      }    }    sys.env.get("SPARK_CLASSPATH").foreach { value =>      val warning =        s"""          |SPARK_CLASSPATH was detected (set to '$value').          |This is deprecated in Spark 1.0+.          |          |Please instead use:          | - ./spark-submit with --driver-class-path to augment the driver classpath          | - spark.executor.extraClassPath to augment the executor classpath        """.stripMargin      logWarning(warning)      for (key <- Seq(executorClasspathKey, driverClassPathKey)) {        if (getOption(key).isDefined) {          throw new SparkException(s"Found both $key and SPARK_CLASSPATH. Use only the former.")        } else {          logWarning(s"Setting '$key' to '$value' as a work-around.")          set(key, value)        }      }    }    if (!contains(sparkExecutorInstances)) {      sys.env.get("SPARK_WORKER_INSTANCES").foreach { value =>        val warning =          s"""             |SPARK_WORKER_INSTANCES was detected (set to '$value').             |This is deprecated in Spark 1.0+.             |             |Please instead use:             | - ./spark-submit with --num-executors to specify the number of executors             | - Or set SPARK_EXECUTOR_INSTANCES             | - spark.executor.instances to configure the number of instances in the spark config.        """.stripMargin        logWarning(warning)        set("spark.executor.instances", value)      }    }    if (contains("spark.master") && get("spark.master").startsWith("yarn-")) {      val warning = s"spark.master ${get("spark.master")} is deprecated in Spark 2.0+, please " +        "instead use \"yarn\" with specified deploy mode."      get("spark.master") match {        case "yarn-cluster" =>          logWarning(warning)          set("spark.master", "yarn")          set("spark.submit.deployMode", "cluster")        case "yarn-client" =>          logWarning(warning)          set("spark.master", "yarn")          set("spark.submit.deployMode", "client")        case _ => // Any other unexpected master will be checked when creating scheduler backend.      }    }    if (contains("spark.submit.deployMode")) {      get("spark.submit.deployMode") match {        case "cluster" | "client" =>        case e => throw new SparkException("spark.submit.deployMode can only be \"cluster\" or " +          "\"client\".")      }    }  }  /**   * Return a string listing all keys and values, one per line. This is useful to print the   * configuration out for debugging.   */  def toDebugString: String = {    getAll.sorted.map{case (k, v) => k + "=" + v}.mkString("\n")  }}private[spark] object SparkConf extends Logging {  /**   * Maps deprecated config keys to information about the deprecation.   *   * The extra information is logged as a warning when the config is present in the user's   * configuration.   */  private val deprecatedConfigs: Map[String, DeprecatedConfig] = {    val configs = Seq(      DeprecatedConfig("spark.cache.class", "0.8",        "The spark.cache.class property is no longer being used! Specify storage levels using " +        "the RDD.persist() method instead."),      DeprecatedConfig("spark.yarn.user.classpath.first", "1.3",        "Please use spark.{driver,executor}.userClassPathFirst instead."),      DeprecatedConfig("spark.kryoserializer.buffer.mb", "1.4",        "Please use spark.kryoserializer.buffer instead. The default value for " +          "spark.kryoserializer.buffer.mb was previously specified as '0.064'. Fractional values " +          "are no longer accepted. To specify the equivalent now, one may use '64k'."),      DeprecatedConfig("spark.rpc", "2.0", "Not used any more."),      DeprecatedConfig("spark.scheduler.executorTaskBlacklistTime", "2.1.0",        "Please use the new blacklisting options, spark.blacklist.*")    )    Map(configs.map { cfg => (cfg.key -> cfg) } : _*)  }  /**   * Maps a current config key to alternate keys that were used in previous version of Spark.   *   * The alternates are used in the order defined in this map. If deprecated configs are   * present in the user's configuration, a warning is logged.   */  private val configsWithAlternatives = Map[String, Seq[AlternateConfig]](    "spark.executor.userClassPathFirst" -> Seq(      AlternateConfig("spark.files.userClassPathFirst", "1.3")),    "spark.history.fs.update.interval" -> Seq(      AlternateConfig("spark.history.fs.update.interval.seconds", "1.4"),      AlternateConfig("spark.history.fs.updateInterval", "1.3"),      AlternateConfig("spark.history.updateInterval", "1.3")),    "spark.history.fs.cleaner.interval" -> Seq(      AlternateConfig("spark.history.fs.cleaner.interval.seconds", "1.4")),    "spark.history.fs.cleaner.maxAge" -> Seq(      AlternateConfig("spark.history.fs.cleaner.maxAge.seconds", "1.4")),    "spark.yarn.am.waitTime" -> Seq(      AlternateConfig("spark.yarn.applicationMaster.waitTries", "1.3",        // Translate old value to a duration, with 10s wait time per try.        translation = s => s"${s.toLong * 10}s")),    "spark.reducer.maxSizeInFlight" -> Seq(      AlternateConfig("spark.reducer.maxMbInFlight", "1.4")),    "spark.kryoserializer.buffer" ->        Seq(AlternateConfig("spark.kryoserializer.buffer.mb", "1.4",          translation = s => s"${(s.toDouble * 1000).toInt}k")),    "spark.kryoserializer.buffer.max" -> Seq(      AlternateConfig("spark.kryoserializer.buffer.max.mb", "1.4")),    "spark.shuffle.file.buffer" -> Seq(      AlternateConfig("spark.shuffle.file.buffer.kb", "1.4")),    "spark.executor.logs.rolling.maxSize" -> Seq(      AlternateConfig("spark.executor.logs.rolling.size.maxBytes", "1.4")),    "spark.io.compression.snappy.blockSize" -> Seq(      AlternateConfig("spark.io.compression.snappy.block.size", "1.4")),    "spark.io.compression.lz4.blockSize" -> Seq(      AlternateConfig("spark.io.compression.lz4.block.size", "1.4")),    "spark.rpc.numRetries" -> Seq(      AlternateConfig("spark.akka.num.retries", "1.4")),    "spark.rpc.retry.wait" -> Seq(      AlternateConfig("spark.akka.retry.wait", "1.4")),    "spark.rpc.askTimeout" -> Seq(      AlternateConfig("spark.akka.askTimeout", "1.4")),    "spark.rpc.lookupTimeout" -> Seq(      AlternateConfig("spark.akka.lookupTimeout", "1.4")),    "spark.streaming.fileStream.minRememberDuration" -> Seq(      AlternateConfig("spark.streaming.minRememberDuration", "1.5")),    "spark.yarn.max.executor.failures" -> Seq(      AlternateConfig("spark.yarn.max.worker.failures", "1.5")),    "spark.memory.offHeap.enabled" -> Seq(      AlternateConfig("spark.unsafe.offHeap", "1.6")),    "spark.rpc.message.maxSize" -> Seq(      AlternateConfig("spark.akka.frameSize", "1.6")),    "spark.yarn.jars" -> Seq(      AlternateConfig("spark.yarn.jar", "2.0"))    )  /**   * A view of `configsWithAlternatives` that makes it more efficient to look up deprecated   * config keys.   *   * Maps the deprecated config name to a 2-tuple (new config name, alternate config info).   */  private val allAlternatives: Map[String, (String, AlternateConfig)] = {    configsWithAlternatives.keys.flatMap { key =>      configsWithAlternatives(key).map { cfg => (cfg.key -> (key -> cfg)) }    }.toMap  }  /**   * Return whether the given config should be passed to an executor on start-up.   *   * Certain authentication configs are required from the executor when it connects to   * the scheduler, while the rest of the spark configs can be inherited from the driver later.   */  def isExecutorStartupConf(name: String): Boolean = {    (name.startsWith("spark.auth") && name != SecurityManager.SPARK_AUTH_SECRET_CONF) ||    name.startsWith("spark.ssl") ||    name.startsWith("spark.rpc") ||    isSparkPortConf(name)  }  /**   * Return true if the given config matches either `spark.*.port` or `spark.port.*`.   */  def isSparkPortConf(name: String): Boolean = {    (name.startsWith("spark.") && name.endsWith(".port")) || name.startsWith("spark.port.")  }  /**   * Looks for available deprecated keys for the given config option, and return the first   * value available.   */  def getDeprecatedConfig(key: String, conf: SparkConf): Option[String] = {    configsWithAlternatives.get(key).flatMap { alts =>      alts.collectFirst { case alt if conf.contains(alt.key) =>        val value = conf.get(alt.key)        if (alt.translation != null) alt.translation(value) else value      }    }  }  /**   * Logs a warning message if the given config key is deprecated.   */  def logDeprecationWarning(key: String): Unit = {    deprecatedConfigs.get(key).foreach { cfg =>      logWarning(        s"The configuration key '$key' has been deprecated as of Spark ${cfg.version} and " +        s"may be removed in the future. ${cfg.deprecationMessage}")      return    }    allAlternatives.get(key).foreach { case (newKey, cfg) =>      logWarning(        s"The configuration key '$key' has been deprecated as of Spark ${cfg.version} and " +        s"may be removed in the future. Please use the new key '$newKey' instead.")      return    }    if (key.startsWith("spark.akka") || key.startsWith("spark.ssl.akka")) {      logWarning(        s"The configuration key $key is not supported any more " +          s"because Spark doesn't use Akka since 2.0")    }  }  /**   * Holds information about keys that have been deprecated and do not have a replacement.   *   * @param key The deprecated key.   * @param version Version of Spark where key was deprecated.   * @param deprecationMessage Message to include in the deprecation warning.   */  private case class DeprecatedConfig(      key: String,      version: String,      deprecationMessage: String)  /**   * Information about an alternate configuration key that has been deprecated.   *   * @param key The deprecated config key.   * @param version The Spark version in which the key was deprecated.   * @param translation A translation function for converting old config values into new ones.   */  private case class AlternateConfig(      key: String,      version: String,      translation: String => String = null)}

转载于:https://blog.51cto.com/xpleaf/2173008

你可能感兴趣的文章
Linux 启动、关闭、重启网络服务
查看>>
Sublime Text 相关
查看>>
深入理解css优先级
查看>>
android的armeabi和armeabi-v7a
查看>>
android自己定义控件系列教程-----仿新版优酷评论剧集卡片滑动控件
查看>>
lvs之 lvs+nginx+tomcat_1、tomcat_2+redis(lvs dr 模式)
查看>>
让“是男人就下到100层”在Android平台上跑起来
查看>>
hdu4292Food(最大流Dinic算法)
查看>>
webdriver API study
查看>>
【Machine Learning in Action --4】朴素贝叶斯过滤网站的恶意留言
查看>>
Ubuntu+Eclipse+ADT+Genymotion+VirtualBox开发环境搭建
查看>>
Android 学习之 开源项目PullToRefresh的使用
查看>>
Matplot中文乱码完美解决方式
查看>>
tomcat的webappclassloader中一个奇怪的异常信息
查看>>
漫谈程序猿系列:群星闪耀的黄金时代
查看>>
2016百度编程题:蘑菇阵
查看>>
webpack系列之一总览
查看>>
如何打造BCH使用的刚性需求?
查看>>
一个小需求引发的思考
查看>>
慎用System.nanoTime()
查看>>