当前位置: 代码迷 >> 综合 >> SparkSQL| RDDDataFrameDataSet
  详细解决方案

SparkSQL| RDDDataFrameDataSet

热度:75   发布时间:2024-01-12 00:03:41.0
  • RDD:

RDD就是一个不可变的分布式对象集合,是Spark对数据的核心抽象。每个RDD都被分为多个分区,每个分区就是一个数据集片段,这些分区运行在集群中的不同节点上。
RDD提供了一种高度受限的内存共享模型,即RDD是只读的,只能基于稳定的物理储存中的数据集来创建RDD或对已有的RDD进行转换操作来得到新的RDD。

  • DataFrame:

DataFrame是用在Spark SQL中的一种存放Row对象的特殊RDD,是Spark SQL中的数据抽象,它是一种结构化的数据集,每一条数据都由几个命名字段组成(类似与传统数据库中的表),DataFrame能够利用结构信息更加高效的存储数据。同时, Spark SQL为DataFrame提供了非常好用的API,而且还能注册成表使用sql来操作。
DataFrame可以从外部数据源创建,也可以从查询结果或已有的RDD创建。

  • Dataset:

Dataset是Spark1.6开始提供的API,是Spark SQL最新的数据抽象。它把RDD的优势(强类型,可以使用lambda表达式函数)和Spark SQL的优化执行引擎结合到了一起。
Dataset可以从JVM对象创建得到,而且可以像DataFrame一样使用API或sql来操作。


三者的关系:

  • RDD + Schema = DataFrame = Dataset[Row]
    注:RDD是Spark的核心,DataFrame/Dataset是Spark SQL的核心,RDD不支持sql操作。

三者的优缺点总结:

  • RDD 优点:

    • JVM对象组成的分布式数据集合
    • 不可变并且有容错能力
    • 可处理结构化和非结构化的数据
    • 支持函数式转换
  • RDD缺点:

    • 没有Schema
    • 用户自己优化程序
    • 从不同的数据源读取数据非常困难
    • 合并多个数据源中的数据也非常困难
  • DataFrame的优点:

    • Row对象组成的分布式数据集
    • 不可变并且有容错能力
    • 处理结构化数据
    • 自带优化器Catalyset,可自动优化程序
    • DataFrame让Spark对结构化数据有了处理能力
  • DataFrame的缺点:

    • 编译时不能类型转化进行安全检查,运行时才能确定是否有问题
    • 对于对象支持不友好,RDD内部数据直接以Java对象存储,DataFrame内存存储的是Row对象而不能是自定义对象
  • DateSet的优点:

    • DateSet整合了RDD和DataFrame的优点,支持结构化和非结构化数据
    • 和RDD一样,支持自定义对象存储
    • 和DataFrame一样,支持结构化数据的sql查询
    • 采用堆外内存存储,gc友好
    • 类型转化安全,代码友好

三者之间的相互转换:

在Spark生态系统中,其他所有的框架都运行在Spark Core之上,而Spark Core的唯一组件就是RDD,Spark处理数据的场景都是建立在统一的数据抽象RDD上,所以Spark SQL对接Spark Core的方式就是将DataFrame/Dataset转化成RDD,并且三者之间支持相互转换。

  • RDD—>DataFrame (DataFrame的创建&操作-这篇文章也有总结)
// 创建RDD
val userRDD = sc.textFile("hdfs:///temp/data/user.csv")
.map(_.split(","))
.map(arr => arr(0), arr(1), arr(2))
// 转化成DataFrame
val userDF = userRDD.toDF("user_id", "user_name", "user_gender")
  • DataFrame—>RDD

 

val userRDD = userDF.rdd
  • RDD—>Dataset
// 定义一个case class类
case class user (user_id: String, user_name: String, user_gender: Int)
// 创建RDD
val userRDD = sc.textFile("hdfs:///temp/data/user/csv")
.map(_.split(","))
// 转换成Dataset
val userDS = userRDD.map(arr => user(arr(0), arr(1), arr(2))).toDS()
  • Dataset—>RDD
val userRDD = userDS.rdd
  • DataFrame—>Dataset
// 定义一个case class类
case class user (user_id: String, user_name: String, user_gender: Int)
// 创建DataFrame,直接从Hive中取
val userDF = spark.table("default.userdf")
// 转换成Dataset
val userDS = userDF.as[user]
  • Dataset—>DataFarme
val userDF = userDS.toDF()