JavaPairRDD的coalesce方法讲解
官方文档说明
/*** Return a new RDD that is reduced into `numPartitions` partitions.* This results in a narrow dependency, e.g. if you go from 1000 partitions* to 100 partitions, there will not be a shuffle, instead each of the 100* new partitions will claim 10 of the current partitions. If a larger number* of partitions is requested, it will stay at the current number of partitions.* However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,* this may result in your computation taking place on fewer nodes than* * you like (e.g. one node in the case of numPartitions = 1). To avoid this,* you can pass shuffle = true. This will add a shuffle step, but means the* current upstream partitions will be executed in parallel (per whatever* the current partitioning is).** @note With shuffle = true, you can actually coalesce to a larger number* of partitions. This is useful if you have a small number of partitions,* say 100, potentially with a few partitions being abnormally large. Calling* coalesce(1000, shuffle = true) will result in 1000 partitions with the* data distributed using a hash partitioner. The optional partition coalescer* passed in must be serializable.*/
中文含义
返回一个经过简化到numPartitions个分区的新RDD。这会导致一个窄依赖,例如:你将1000个分区转换成100个分区,这个过程不会发生shuffle,相反如果10个分区转换成100个分区将会发生shuffle。然而如果你想大幅度合并分区,例如合并成一个分区,这会导致你的计算在少数几个集群节点上计算(言外之意:并行度不够)。为了避免这种情况,你可以将第二个shuffle参数传递一个true,这样会在重新分区过程中多一步shuffle,这意味着上游的分区可以并行运行。
注意:第二个参数shuffle=true,将会产生多于之前的分区数目,例如你有一个个数较少的分区,假如是100,调用coalesce(1000, shuffle = true)将会使用一个 HashPartitioner产生1000个分区分布在集群节点上。这个(对于提高并行度)是非常有用的。
方法原型
//scala
/*** Return a new RDD that is reduced into `numPartitions` partitions.*/
def coalesce(numPartitions: Int): JavaPairRDD[K, V] = fromRDD(rdd.coalesce(numPartitions))/*** Return a new RDD that is reduced into `numPartitions` partitions.*/
def coalesce(numPartitions: Int, shuffle: Boolean): JavaPairRDD[K, V] =fromRDD(rdd.coalesce(numPartitions, shuffle))
//java
public JavaPairRDD<K,V> coalesce(int numPartitions)public JavaPairRDD<K,V> coalesce(int numPartitions,boolean shuffle)
说明
实例
public class Coalesce {
public static void main(String[] args) {
System.setProperty("hadoop.home.dir", "E:\\hadoop-2.7.1");SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("Spark_DEMO");JavaSparkContext sc = new JavaSparkContext(sparkConf);List<Tuple2<String,String>> list = Lists.newArrayList();list.add(new Tuple2<String, String>("xiaoming1","123456" ));list.add(new Tuple2<String, String>("xiaoming2","123456" ));list.add(new Tuple2<String, String>("xiaoming3","123456" ));list.add(new Tuple2<String, String>("xiaoming4","123456" ));list.add(new Tuple2<String, String>("xiaoming5","123456" ));list.add(new Tuple2<String, String>("xiaoming6","123456" ));list.add(new Tuple2<String, String>("xiaoming7","123456" ));list.add(new Tuple2<String, String>("xiaoming8","123456" ));// 分8个分区JavaPairRDD<String,String> javaPairRDD = sc.parallelizePairs(list,8);// 每个分区2个javaPairRDD.foreach(new VoidFunction<Tuple2<String, String>>() {
public void call(Tuple2<String, String> stringStringTuple2) throws Exception {
System.out.println("----"+stringStringTuple2);}});// 合成一个分区JavaPairRDD javaPairRDD1 = javaPairRDD.coalesce(1,true);javaPairRDD1.foreach(new VoidFunction<Tuple2<String, String>>() {
public void call(Tuple2<String, String> stringStringTuple2) throws Exception {
System.out.println("+++++"+stringStringTuple2);}});}
}
结果
19/03/13 20:31:51 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
----(xiaoming1,123456)
19/03/13 20:31:51 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 708 bytes result sent to driver
19/03/13 20:31:51 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, PROCESS_LOCAL, 4893 bytes)
19/03/13 20:31:51 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
19/03/13 20:31:51 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 536 bytes result sent to driver
19/03/13 20:31:51 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, localhost, executor driver, partition 2, PROCESS_LOCAL, 4893 bytes)
19/03/13 20:31:51 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
----(xiaoming2,123456)
----(xiaoming3,123456)
19/03/13 20:31:51 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 536 bytes result sent to driver
19/03/13 20:31:51 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, localhost, executor driver, partition 3, PROCESS_LOCAL, 4893 bytes)
19/03/13 20:31:51 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
19/03/13 20:31:51 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 187 ms on localhost (executor driver) (1/8)
----(xiaoming4,123456)
19/03/13 20:31:51 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 579 bytes result sent to driver
19/03/13 20:31:51 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 31 ms on localhost (executor driver) (2/8)
19/03/13 20:31:51 INFO TaskSetManager: Starting task 4.0 in stage 0.0 (TID 4, localhost, executor driver, partition 4, PROCESS_LOCAL, 4893 bytes)
19/03/13 20:31:51 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 36 ms on localhost (executor driver) (3/8)
19/03/13 20:31:51 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 15 ms on localhost (executor driver) (4/8)
19/03/13 20:31:51 INFO Executor: Running task 4.0 in stage 0.0 (TID 4)
----(xiaoming5,123456)
19/03/13 20:31:51 INFO Executor: Finished task 4.0 in stage 0.0 (TID 4). 622 bytes result sent to driver
19/03/13 20:31:51 INFO TaskSetManager: Starting task 5.0 in stage 0.0 (TID 5, localhost, executor driver, partition 5, PROCESS_LOCAL, 4893 bytes)
19/03/13 20:31:51 INFO TaskSetManager: Finished task 4.0 in stage 0.0 (TID 4) in 141 ms on localhost (executor driver) (5/8)
19/03/13 20:31:51 INFO Executor: Running task 5.0 in stage 0.0 (TID 5)
----(xiaoming6,123456)
19/03/13 20:31:51 INFO Executor: Finished task 5.0 in stage 0.0 (TID 5). 536 bytes result sent to driver
19/03/13 20:31:51 INFO TaskSetManager: Starting task 6.0 in stage 0.0 (TID 6, localhost, executor driver, partition 6, PROCESS_LOCAL, 4893 bytes)
19/03/13 20:31:51 INFO Executor: Running task 6.0 in stage 0.0 (TID 6)
----(xiaoming7,123456)
19/03/13 20:31:51 INFO Executor: Finished task 6.0 in stage 0.0 (TID 6). 622 bytes result sent to driver
19/03/13 20:31:51 INFO TaskSetManager: Finished task 5.0 in stage 0.0 (TID 5) in 156 ms on localhost (executor driver) (6/8)
19/03/13 20:31:51 INFO TaskSetManager: Starting task 7.0 in stage 0.0 (TID 7, localhost, executor driver, partition 7, PROCESS_LOCAL, 4893 bytes)
19/03/13 20:31:51 INFO TaskSetManager: Finished task 6.0 in stage 0.0 (TID 6) in 16 ms on localhost (executor driver) (7/8)
19/03/13 20:31:51 INFO Executor: Running task 7.0 in stage 0.0 (TID 7)
19/03/13 20:31:51 INFO Executor: Finished task 7.0 in stage 0.0 (TID 7). 622 bytes result sent to driver
----(xiaoming8,123456)
19/03/13 20:31:51 INFO TaskSetManager: Finished task 7.0 in stage 0.0 (TID 7) in 16 ms on localhost (executor driver) (8/8)19/03/13 20:31:52 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
+++++(xiaoming1,123456)
+++++(xiaoming2,123456)
+++++(xiaoming3,123456)
+++++(xiaoming4,123456)
+++++(xiaoming5,123456)
+++++(xiaoming6,123456)
+++++(xiaoming7,123456)
+++++(xiaoming8,123456)
这里有一个分区有两个值,共七个了,最后合成一个
说明
一.spark 分区 partition的理解:
spark中是以vcore级别调度task的。
如果读取的是hdfs,那么有多少个block,就有多少个partition 举例来说:sparksql 要读表T, 如果表T有1w个小文件,那么就有1w个partition 这时候读取效率会较低。假设设置资源为 --executor-memory 2g --executor-cores 2 --num-executors 5。 步骤是拿出1-10号10个小文件(也就是10个partition) 分别给5个executor读取(spark调度会以vcore为单位,实际就是5个executor,10个task读10个partition) 如果5个executor执行速度相同,再拿11-20号文件 依次给这5个executor读取 而实际执行速度不会完全相同,那就是哪个task先执行完,哪个task领取下一个partition读取执行,以此类推。这样往往读取文件的调度时间大于读取文件本身,而且会频繁打开关闭文件句柄,浪费较为宝贵的io资源,执行效率也大大降低。
二.coalesce 与 repartition的区别(我们下面说的coalesce都默认shuffle参数为false的情况)
repartition(numPartitions:Int):RDD[T]和coalesce(numPartitions:Int,shuffle:Boolean=false):RDD[T]
repartition只是coalesce接口中shuffle为true的实现
我们还拿上面的例子说:
有1w的小文件,资源也为--executor-memory 2g --executor-cores 2 --num-executors 5。 repartition(4):产生shuffle。这时会启动5个executor像之前介绍的那样依次读取1w个分区的文件,然后按照某个规则%4,写到4个文件中,这样分区的4个文件基本毫无规律,比较均匀。 coalesce(4):这个coalesce不会产生shuffle。那启动5个executor在不发生shuffle的时候是如何生成4个文件呢,其实会有1个或2个或3个甚至更多的executor在空跑(具体几个executor空跑与spark调度有关,与数据本地性有关,与spark集群负载有关),他并没有读取任何数据!
PS:
1.如果结果产生的文件数要比源RDD partition少,用coalesce是实现不了的,例如有4个小文件(4个partition),你要生成5个文件用coalesce实现不 了,也就是说不产生shuffle,无法实现文件数变多。 2.如果你只有1个executor(1个core),源RDD partition有5个,你要用coalesce产生2个文件。那么他是预分partition到executor上的,例如0-2号分区 在先executor上执行完毕,3-4号分区再次在同一个executor执行。其实都是同一个executor但是前后要串行读不同数据。与用repartition(2)在读 partition上有较大不同(串行依次读0-4号partition 做%2处理)。
三.实例:
T表有10G数据 有100个partition 资源也为--executor-memory 2g --executor-cores 2 --num-executors 5。我们想要结果文件只有一个
1 如果用coalesce:sql(select * from T).coalesce(1)5个executor 有4个在空跑,只有1个在真正读取数据执行,这时候效率是极低的。所以coalesce要慎用,而且它还用产出oom问题,这个我们以后再说。
2 如果用repartition:sql(select * from T).repartition(1)这样效率就会高很多,并行5个executor在跑(10个task),然后shuffle到同一节点,最后写到一个文件中那么如果我不想产生一个文件了,我想产生10个文件会怎样,是不是coalesce 又变得比 repartition高效了呢。(因为coalesce无shuffle,相当于每个executor的 task认领 10个 partition)
那么如果我又不想产生10个文件呢?其实一旦要产生的文件数大于executor x vcore数,coalesce效率就更高(一般是这样,不绝对)。
四.总结:
我们常认为coalesce不产生shuffle会比repartition 产生shuffle效率高,而实际情况往往要根据具体问题具体分析,coalesce效率不一定高,有时还有大坑,大家要慎用。
coalesce 与 repartition 他们两个都是RDD的分区进行重新划分,repartition只是coalesce接口中shuffle为true的实现(假设源RDD有N个分区,需要重新划分成M个分区)
1)如果N<M。一般情况下N个分区有数据分布不均匀的状况,利用HashPartitioner函数将数据重新分区为M个,这时需要将shuffle设置为true(repartition实现,coalesce也实现不了)。
2)如果N>M并且N和M相差不多,(假如N是1000,M是100)那么就可以将N个分区中的若干个分区合并成一个新的分区,最终合并为M个分区,这时可以将shuff设置为false(coalesce实现),如果M>N时,coalesce是无效的,不进行shuffle过程,父RDD和子RDD之间是窄依赖关系,无法使文件数(partiton)变多。
总之如果shuffle为false时,如果传入的参数大于现有的分区数目,RDD的分区数不变,也就是说不经过shuffle,是无法将RDD的分区数变多的
3)如果N>M并且两者相差悬殊,这时你要看executor数与要生成的partition关系,如果executor数 <= 要生成partition数,coalesce效率高,反之如果用coalesce会导致(executor数-要生成partiton数)个excutor空跑从而降低效率。如果在M为1的时候,为了使coalesce之前的操作有更好的并行度,可以将shuffle设置为true
参考博客:
[spark partition 理解 / coalesce 与 repartition的区别](https://www.cnblogs.com/jiangxiaoxian/p/9539760.html)
[Spark 重分区函数:coalesce和repartition区别](https://www.jianshu.com/p/ee069b6bc273)