当前位置: 代码迷 >> 综合 >> spark封神之路(12)-RDD checkpoint
  详细解决方案

spark封神之路(12)-RDD checkpoint

热度:53   发布时间:2023-11-30 10:23:53.0

1 checkpoint介绍

把rdd中的数据,直接存储到hdfs中或者本机目录中。

适用场景: 非常非常非常不容易得到的数据。迭代次数非常多的数据

  • SQL Server 数据库引擎可以在意外关闭或崩溃后从恢复期间开始应用日志中包含的更改

  • HDFS的元数据管理的时候我们提到过checkpoint机制 , 定期更新元数据的一种策略

所以你可以简单理解成 Checkpoint 是用来容错的,当错误发生的时候,可以迅速恢复的一种机制A checkpoint creates a known good point from which the SQL Server Database Engine can start applying changes contained in the log during recovery after an unexpected shutdown or crash.

回到 Spark 上,尤其在流式计算里,需要高容错的机制来确保程序的稳定和健壮。从源码中看看,在 Spark 中,源码可以在 Streaming 包中的 Checkpoint。

当一个计算有上百个甚至更多个rdd的时候,如果前20个rdd的计算结果重复被使用,这个时候我们就可以使用checkpoint将前20个rdd的计算结果缓存到本地。大大提高运算效率,

当RDD使用cache机制从内存中读取数据,如果数据没有读到,会使用checkpoint机制读取数据。此时如果没有checkpoint机制,那么就需要找到父RDD重新计算数据了,因此checkpoint是个很重要的容错机制。checkpoint就是对于一个RDD chain,如果中间某些中间结果RDD,后面需要反复使用该数据,可能因为一些故障导致该中间数据丢失,那么就可以针对该RDD启动checkpoint机制!

2 如何使用

首先需要调用sparkContext的setCheckpointDir(“path”),设置checkpoint保存的路径,比如hdfs路径

对RDD调用checkpoint方法

当rdd调用action的时候,启动一个job计算rdd,当此RDD所处的job运行结束后,会启动一个单独的job,来将checkpoint过的数据写入之前设置的文件系统持久化,进行高可用。所以后面的计算在使用该RDD时,如果数据丢失了,但是还是可以从它的checkpoint中读取数据,不需要重新计算。

def main(args: Array[String]): Unit = {val sc: SparkContext = SparkUtil.getScval rdd1: RDD[String] = sc.textFile("d://word.txt")val rdd2: RDD[String] = rdd1.flatMap(_.split("\\s+"))val rdd3: RDD[String] = rdd2.map(e => {println("---------------------")e})val rdd4: RDD[(String, Int)] = rdd3.map(word => {(word, 1)})// 在进行RDD的检查点操作以前先设置检查点的路径 , 实际生产环境下将目录设置成HDFS目录 , 会自动创建sc.setCheckpointDir("ck/data")//设置RDD的检查点 ,当触发行动算子的时候会将RDD的数据缓存在指定的目录下 , 删除原来的RDD依赖关系rdd4.checkpoint()val res1: RDD[(String, Int)] = rdd4.reduceByKey(_ + _)res1.foreach(println)val res2: RDD[(String, Iterable[Int])] = rdd4.groupByKey()res2.foreach(println)sc.stop()}

1, 必须要对SparkContext设置 checkPointDir, 会自动创建目录

2, checkpoint 是lazy执行的,当调用action算子时,才真正的执行checkpoint

3, checkpoint会产生2个job。第一个job,执行业务逻辑;第二个job,把rdd中的产生数据写入到hdfs目录下。

4, 某一个rdd被checkpoint之后,原有的rdd依赖关系被删除了。取而代之的是CheckpointRDD

5, 基于被checkpoint的rdd之上的所有操作,依赖关系都是从checkpointRDD开始,数据都从目录中读取。

6, checkpoint 和cache同时使用 先cache再checkpoint

 

3 缓存和检查点的区别

  • cache :只能将数据临时存储在内存中进行数据重用

  • persist :将数据临时存储在磁盘文件中进行数据重用 涉及到磁盘I0,性能较低,但是数据安全 如果作业执行完毕,临时保存的数据文件就会丢失

  • 上面的两个函数 会在血缘关系中添加新的依赖。一旦出现问题,可以重头读取数据

  • checkpoint :将数据长久地保存在磁盘文件中进行数据重用 1) 为了保证数据安全,在执行checkpoint的时候会生成两个Job,一个专门负责持久化的Job ,所以数据会加载两次 2) 为了能够提高效率,一般情况下,是需要和cache联合使用 3) 执行过程中,会切断血缘关系。重新建立新的血缘关系,改变数据源为检查点目录

 println(rdd4.toDebugString)sc.setCheckpointDir("ck/data")//设置RDD的检查点 ,当触发行动算子的时候会将RDD的数据缓存在指定的目录下 , 删除原来的RDD依赖关系rdd4.checkpoint()rdd4.collect()// 在行动算子后面调用println(rdd4.toDebugString)(2) MapPartitionsRDD[4] at map at Demo3.scala:22 []|  MapPartitionsRDD[3] at map at Demo3.scala:18 []|  MapPartitionsRDD[2] at flatMap at Demo3.scala:17 []|  d://word.txt MapPartitionsRDD[1] at textFile at Demo3.scala:16 []|  d://word.txt HadoopRDD[0] at textFile at Demo3.scala:16 []
(2) MapPartitionsRDD[4] at map at Demo3.scala:22 []|  ReliableCheckpointRDD[7] at collect at Demo3.scala:33 []-- 仅仅将RDD缓存起来 不做checkpoint 依赖关系是这样的
(2) MapPartitionsRDD[4] at map at Demo3.scala:22 [Memory Deserialized 1x Replicated]|       CachedPartitions: 2; MemorySize: 4.0 KiB; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B|  MapPartitionsRDD[3] at map at Demo3.scala:18 [Memory Deserialized 1x Replicated]|  MapPartitionsRDD[2] at flatMap at Demo3.scala:17 [Memory Deserialized 1x Replicated]|  d://word.txt MapPartitionsRDD[1] at textFile at Demo3.scala:16 [Memory Deserialized 1x Replicated]|  d://word.txt HadoopRDD[0] at textFile at Demo3.scala:16 [Memory Deserialized 1x Replicated]

使用方式

sc.setCheckpointDir("ck/data")
rdd4.cache()  // 先将数据缓存起来 , 避免两次加载数据 提升效率
rdd4.checkpoint()

 

  相关解决方案