转载文档:
历时近两年,Apache Spark 3.0.0 正式版终于发布了
动态分区修剪(Dynamic Partition Pruning)
所谓的动态分区裁剪就是基于运行时(run time)推断出来的信息来进一步进行分区裁剪。举个例子,我们有如下的查询:
SELECT * FROM dim_iteblog
JOIN fact_iteblog
ON (dim_iteblog.partcol = fact_iteblog.partcol)
WHERE dim_iteblog.othercol > 10
假设 dim_iteblog 表的 dim_iteblog.othercol > 10 过滤出来的数据比较少,但是由于之前版本的 Spark 无法进行动态计算代价,所以可能会导致 fact_iteblog 表扫描出大量无效的数据。有了动态分区裁减,可以在运行的时候过滤掉 fact_iteblog 表无用的数据。经过这个优化,查询扫描的数据大大减少,性能提升了 33 倍。
相关配置:
要启用动态分区裁剪需要将 spark.sql.optimizer.dynamicPartitionPruning.enabled 参数设置为 true(默认),其他相关参数:
- spark.sql.optimizer.dynamicPartitionPruning.useStats:true(默认),When true, distinct count statistics will be used for computing the data size of the partitioned table after dynamic partition pruning, in order to evaluate if it is worth adding an extra subquery as the pruning filter if broadcast reuse is not applicable.
- spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio:0.5,When statistics are not available or configured not to be used, this config will be used as the fallback filter ratio for computing the data size of the partitioned table after dynamic partition pruning, in order to evaluate if it is worth adding an extra subquery as the pruning filter if broadcast reuse is not applicable.
- spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcast:默认为 true,When true, dynamic partition pruning will seek to reuse the broadcast results from a broadcast hash join operation.
详细请参考:
https://www.iteblog.com/archives/8589.html
https://www.iteblog.com/archives/8590.html
自适应查询执行(Adaptive Query Execution)
自适应查询执行(又称 Adaptive Query Optimisation 或者 Adaptive Optimisation)是对查询执行计划的优化,允许 Spark Planner 在运行时执行可选的执行计划,这些计划将基于运行时统计数据进行优化。
早在2015年,Spark 社区就提出了自适应执行的基本想法,在 Spark 的 DAGScheduler 中增加了提交单个 map stage 的接口,并且在实现运行时调整 shuffle partition 数量上做了尝试。但目前该实现有一定的局限性,在某些场景下会引入更多的 shuffle,即更多的 stage,对于三表在同一个 stage 中做 join 等情况也无法很好的处理;而且使用当前框架很难灵活地在自适应执行中实现其他功能,例如更改执行计划或在运行时处理倾斜的 join。
AQE 框架目前提供了三个功能:
- 动态合并 shuffle partitions;
- 动态调整 join 策略;
- 动态优化倾斜的 join(skew joins)。
基于没有统计数据的 1TB TPC-DS 基准,Spark 3.0 可以使 q77 的速度提高8倍,使 q5 的速度提高2倍,而对另外26个查询的速度提高1.1倍以上。可以通过设置 SQL 配置 spark.sql.adaptive=true 来启用 AQE,这个参数默认值为 false。
加速器感知调度(Accelerator-aware Scheduling)
如今大数据和机器学习已经有了很大的结合,在机器学习里面,因为计算迭代的时间可能会很长,开发人员一般会选择使用 GPU、FPGA 或 TPU 来加速计算。在 Apache Hadoop 3.1 版本里面已经开始内置原生支持 GPU 和 FPGA 了。作为通用计算引擎的 Spark 肯定也不甘落后,来自 Databricks、NVIDIA、Google 以及阿里巴巴的工程师们正在为 Apache Spark 添加原生的 GPU 调度支持,该方案填补了 Spark 在 GPU 资源的任务调度方面的空白,有机地融合了大数据处理和 AI 应用,扩展了 Spark 在深度学习、信号处理和各大数据应用的应用场景。
目前 Apache Spark 支持的资源管理器 YARN 和 Kubernetes 已经支持了 GPU。为了让 Spark 也支持 GPUs,在技术层面上需要做出两个主要改变:
在 cluster manager 层面上,需要升级 cluster managers 来支持 GPU。并且给用户提供相关 API,使得用户可以控制 GPU 资源的使用和分配。
在 Spark 内部,需要在 scheduler 层面做出修改,使得 scheduler 可以在用户 task 请求中识别 GPU 的需求,然后根据 executor 上的 GPU 供给来完成分配。
因为让 Apache Spark 支持 GPU 是一个比较大的特性,所以项目分为了几个阶段。在 Apache Spark 3.0 版本,将支持在 standalone、 YARN 以及 Kubernetes 资源管理器下支持 GPU,并且对现有正常的作业基本没影响。对于 TPU 的支持、Mesos 资源管理器中 GPU 的支持、以及 Windows 平台的 GPU 支持将不是这个版本的目标。而且对于一张 GPU 卡内的细粒度调度也不会在这个版本支持;Apache Spark 3.0 版本将把一张 GPU 卡和其内存作为不可分割的单元。
Apache Spark DataSource V2
Data Source API 定义如何从存储系统进行读写的相关 API 接口,比如 Hadoop 的 InputFormat/OutputFormat,Hive 的 Serde 等。这些 API 非常适合用户在 Spark 中使用 RDD 编程的时候使用。使用这些 API 进行编程虽然能够解决我们的问题,但是对用户来说使用成本还是挺高的,而且 Spark 也不能对其进行优化。为了解决这些问题,Spark 1.3 版本开始引入了 Data Source API V1,通过这个 API 我们可以很方便的读取各种来源的数据,而且 Spark 使用 SQL 组件的一些优化引擎对数据源的读取进行优化,比如列裁剪、过滤下推等等。
Data Source API V1 为我们抽象了一系列的接口,使用这些接口可以实现大部分的场景。但是随着使用的用户增多,逐渐显现出一些问题:
- 部分接口依赖 SQLContext 和 DataFrame
- 扩展能力有限,难以下推其他算子
- 缺乏对列式存储读取的支持
- 缺乏分区和排序信息
- 写操作不支持事务
- 不支持流处理
为了解决 Data Source V1 的一些问题,从 Apache Spark 2.3.0 版本开始,社区引入了 Data Source API V2,在保留原有的功能之外,还解决了 Data Source API V1 存在的一些问题,比如不再依赖上层 API,扩展能力增强。
详细内容请参考:
https://www.iteblog.com/archives/2578.html
https://www.iteblog.com/archives/2579.html
丰富的 API 和功能
-
增强的 pandas UDF
-
一组完整的 join hints
尽管社区不断提高编译器的智能性,但不能保证编译器始终可以针对每种情况做出最佳决策。Join 算法的选择基于统计和启发式算法,当编译器无法做出最佳选择时,用户仍然可以使用 join hints 来影响优化器选择更好的计划。Apache Spark 3.0 通过添加新的 hints 扩展了现有的 join hints :SHUFFLE_MERGE、SHUFFLE_HASH 和 SHUFFLE_REPLICATE_NL -
新的内置函数
Scala API 中增了32个新的内置函数和高阶函数。在这些内置函数中,添加了一组针对 MAP 的特定内置函数[transform_key,transform_value,map_entries,map_filter,map_zip_with],以简化对 MAP 数据类型的处理。
增强的监控功能
- 重新设计 Structured streaming 的 UI
Structured streaming 最初是在 Spark 2.0 中引入的。Spark 3.0 为监控这些流作业重新设计了 UI
- 增强 EXPLAIN 命令
读取计划(Reading plans)对理解和调优查询非常重要。现有的解决方案看起来很混乱,每个算子的字符串表示可能非常宽,甚至可能被截断。Spark 3.0 版本使用一种新的格式化(FORMATTED)模式对其进行了增强,并且还提供了将计划转储到文件的功能。 - 可观察的指标
连续监视数据质量的变化是管理数据管道非常需要的特性。Spark 3.0 版本为批处理和流处理应用程序引入了这种功能。可观察指标被命名为可以在查询上定义的任意聚合函数(dataframe)。一旦 dataframe 的执行到达一个完成点(例如,完成批查询),就会发出一个命名事件,其中包含自上一个完成点以来处理的数据的指标。
更好的 ANSI SQL 兼容
PostgreSQL 是最先进的开源数据库之一,其支持 SQL:2011 的大部分主要特性,完全符合 SQL:2011 要求的 179 个功能中,PostgreSQL 至少符合 160 个。Spark 社区目前专门开了一个 ISSUE SPARK-27764 来解决 Spark SQL 和 PostgreSQL 之间的差异,包括功能特性补齐、Bug 修改等。功能补齐包括了支持 ANSI SQL 的一些函数、区分 SQL 保留关键字以及内置函数等。这个 ISSUE 下面对应了 231 个子 ISSUE,如果这部分的 ISSUE 都解决了,那么 Spark SQL 和 PostgreSQL 或者 ANSI SQL:2011 之间的差异更小了。
其他
- SparkR 向量化读写
- Kafka Streaming includeHeaders 支持在消息中配置一些 headers 信息
- Spark on K8S:Spark 对 Kubernetes 的支持是从2.3版本开始的,Spark 2.4 得到提升,Spark 3.0 将会加入 Kerberos 以及资源动态分配的支持。
- Remote Shuffle Service:当前的 Shuffle 有很多问题,比如弹性差、对 NodeManager 有很大影响,不适应云环境。为了解决上面问题,将会引入 Remote Shuffle Service,具体参见 SPARK-25299
- 支持 JDK 11:参见 SPARK-24417,之所以直接选择 JDK 11 是因为 JDK 8 即将达到 EOL(end of life),而 JDK9 和 JDK10 已经是 EOL,所以社区就跳过 JDK9 和 JDK10 而直接支持 JDK11。不过 Spark 3.0 预览版默认还是使用 JDK 1.8;
- 移除对 Scala 2.11 的支持,默认支持 Scala 2.12,具体参见 SPARK-26132
- 支持 Hadoop 3.2,具体参见 SPARK-23534,Hadoop 3.0 已经发布了2年了(Apache Hadoop 3.0.0-beta1 正式发布,下一个版本(GA)即可在线上使用),所以支持 Hadoop 3.0 也是自然的,不过 Spark 3.0 预览版默认还是使用 Hadoop 2.7.4。
- 移除 Python 2.x 的支持:早在 2019年6月社区就有相关的讨论关于在 Spark 3.0 移除对 Python 2 的支持,目前 Spark 3.0.0 默认支持 Python 3.x ,参见 SPARK-27884。
- Spark Graph 支持 Cypher:Cypher 是流行的图查询语言,现在我们可以直接在 Spark 3.0 使用 Cypher。
- Spark event logs 支持 Roll 了,参见 《Spark 3.0 终于支持 event logs 滚动了》