-
RDD:
RDD就是一个不可变的分布式对象集合,是Spark对数据的核心抽象。每个RDD都被分为多个分区,每个分区就是一个数据集片段,这些分区运行在集群中的不同节点上。
RDD提供了一种高度受限的内存共享模型,即RDD是只读的,只能基于稳定的物理储存中的数据集来创建RDD或对已有的RDD进行转换操作来得到新的RDD。
-
DataFrame:
DataFrame是用在Spark SQL中的一种存放Row对象的特殊RDD,是Spark SQL中的数据抽象,它是一种结构化的数据集,每一条数据都由几个命名字段组成(类似与传统数据库中的表),DataFrame能够利用结构信息更加高效的存储数据。同时, Spark SQL为DataFrame提供了非常好用的API,而且还能注册成表使用sql来操作。
DataFrame可以从外部数据源创建,也可以从查询结果或已有的RDD创建。
-
Dataset:
Dataset是Spark1.6开始提供的API,是Spark SQL最新的数据抽象。它把RDD的优势(强类型,可以使用lambda表达式函数)和Spark SQL的优化执行引擎结合到了一起。
Dataset可以从JVM对象创建得到,而且可以像DataFrame一样使用API或sql来操作。
三者的关系:
- RDD + Schema = DataFrame = Dataset[Row]
注:RDD是Spark的核心,DataFrame/Dataset是Spark SQL的核心,RDD不支持sql操作。
三者的优缺点总结:
-
RDD 优点:
- JVM对象组成的分布式数据集合
- 不可变并且有容错能力
- 可处理结构化和非结构化的数据
- 支持函数式转换
-
RDD缺点:
- 没有Schema
- 用户自己优化程序
- 从不同的数据源读取数据非常困难
- 合并多个数据源中的数据也非常困难
-
DataFrame的优点:
- Row对象组成的分布式数据集
- 不可变并且有容错能力
- 处理结构化数据
- 自带优化器Catalyset,可自动优化程序
- DataFrame让Spark对结构化数据有了处理能力
-
DataFrame的缺点:
- 编译时不能类型转化进行安全检查,运行时才能确定是否有问题
- 对于对象支持不友好,RDD内部数据直接以Java对象存储,DataFrame内存存储的是Row对象而不能是自定义对象
-
DateSet的优点:
- DateSet整合了RDD和DataFrame的优点,支持结构化和非结构化数据
- 和RDD一样,支持自定义对象存储
- 和DataFrame一样,支持结构化数据的sql查询
- 采用堆外内存存储,gc友好
- 类型转化安全,代码友好
三者之间的相互转换:
在Spark生态系统中,其他所有的框架都运行在Spark Core之上,而Spark Core的唯一组件就是RDD,Spark处理数据的场景都是建立在统一的数据抽象RDD上,所以Spark SQL对接Spark Core的方式就是将DataFrame/Dataset转化成RDD,并且三者之间支持相互转换。
- RDD—>DataFrame (DataFrame的创建&操作-这篇文章也有总结)
// 创建RDD
val userRDD = sc.textFile("hdfs:///temp/data/user.csv")
.map(_.split(","))
.map(arr => arr(0), arr(1), arr(2))
// 转化成DataFrame
val userDF = userRDD.toDF("user_id", "user_name", "user_gender")
- DataFrame—>RDD
val userRDD = userDF.rdd
- RDD—>Dataset
// 定义一个case class类
case class user (user_id: String, user_name: String, user_gender: Int)
// 创建RDD
val userRDD = sc.textFile("hdfs:///temp/data/user/csv")
.map(_.split(","))
// 转换成Dataset
val userDS = userRDD.map(arr => user(arr(0), arr(1), arr(2))).toDS()
- Dataset—>RDD
val userRDD = userDS.rdd
- DataFrame—>Dataset
// 定义一个case class类
case class user (user_id: String, user_name: String, user_gender: Int)
// 创建DataFrame,直接从Hive中取
val userDF = spark.table("default.userdf")
// 转换成Dataset
val userDS = userDF.as[user]
- Dataset—>DataFarme
val userDF = userDS.toDF()