当前位置: 代码迷 >> 综合 >> SparkSQL 基本使用
  详细解决方案

SparkSQL 基本使用

热度:98   发布时间:2023-12-12 03:45:23.0

简介

  • SparkSQL可以直接使用SQL的方式处理结构化数据,也可以通过DataFrame(spark1.3)和Dataset(spark1.6) API 使用编程的方式处理结构化数据,本文只介绍以DataFrame API的方式编程,至于DataFrame 和Daraset有何不同,我们暂时将二者先理解为一个概念,DataFrame API支持多种编程语言,如Java,Scala,Python,R,这就意味着开发人员可以采用自己熟悉的语言编程。

SparkSession

  • Spark2.0中引入了SparkSession的概念,它为用户提供了一个统一的切入点来使用Spark的各项功能,用户可以使用DataFrame和Dataset的各种API,在Spark2.0版本之前,创建一个Spark应用程序就必须要使用一个SparkConf和SparkContext,SparkSession封装了SparkConf、SparkContext和SQLContext,因此创建一个以SparkSQL编程的应用程序,只需要创建一个SparkSession。
  • 创建SparkSession

    import org.apache.spark.sql.SparkSessionval spark = SparkSession
    .builder()
    .appName("Spark SQL basic example")
    .config("spark.some.config.option", "some-value")
    .enableHiveSupport() #支持hive的一些特性,如udf,序列化与反序列化
    .getOrCreate()// For implicit conversions like converting RDDs to DataFrames
    import spark.implicits._   #使用隐式转换
    

DataFrame

  • DataFrame 可以理解为具有Schema结构的RDD,并采用列式存储,他相对于RDD,可以从数据中读取到更多信息,因此可以做更多的优化.
  • 那么怎样将数据转成一个DataFrame呢? DataFrame API 对具有Schema信息的数据提供了format()和load()方法可以将不同格式和内部数据源的数据加载为DataFrame;当然,对外部数据源,DataFrame也提供了很好的支持,并且可以将非结构化数据以RDD的方式进行转换成DataFrame。
eg:读取json文件
    val peopledf=spark.read.format("json").load("file:///data/people.json")peopledf.show #这里的show方法默认显示前20行数据+----+-------+| age|   name|+----+-------+|null|Michael||  30|   Andy||  19| Justin|+----+-------+第一句也可以简写为:val peopledf=spark.read.json("file:///data/people.json")
  • printSchema()

     peopledf.printSchema()
    root|-- age: long (nullable = true)|-- name: string (nullable = true)列出了他的Schema结构(Tree)
    
  • select()

     peopledf.select("name").show
    +-------+
    |   name|
    +-------+
    |Michael|
    |   Andy|
    | Justin|
    +-------+
    peopledf.select("name","age").show 
    +-------+----+
    |   name| age|
    +-------+----+
    |Michael|null|
    |   Andy|  30|
    | Justin|  19|
    +-------+----+
    peopledf.select(df.col("name")).show
    +-------+
    |   name|
    +-------+
    |Michael|
    |   Andy|
    | Justin|
    +-------+
    peopledf.select('name).show  # 此方式需要import spark.implicits_ 导入隐式转换
    +-------+
    |   name|
    +-------+
    |Michael|
    |   Andy|
    | Justin|
    +-------+
    peopledf.select($"name",$"age"+10).show
    +-------+----------+
    |   name|(age + 10)|
    +-------+----------+
    |Michael|      null|
    |   Andy|        40|
    | Justin|        29|
    +-------+----------+
    
  • filter()

    peopledf.filter($"age">21).show
    +---+----+
    |age|name|
    +---+----+
    | 30|Andy|
    +---+----+
    
  • groupBy()

    peopledf.groupBy("age").count().show 
    +----+-----+
    | age|count|
    +----+-----+
    |  19|    1|
    |null|    1|
    |  30|    1|
    +----+-----+
    
  • show()

    show(num,truncate) 传入俩个参数,第一个控制显示行数,第二个控制是否显示全部信息
    
在DataFrame中使用SQL编程
  • SparkSession中的sql()函数可以让应用程序以编程的方式运行SQL查询语句,让结果返回一个DataFrame。

    // Register the DataFrame as a SQL temporary view
    peopledf.createOrReplaceTempView("people")
    val sqldf = spark.sql("select * from people")
    sqlDF.show()
    // +----+-------+
    // | age|   name|
    // +----+-------+
    // |null|Michael|
    // |  30|   Andy|
    // |  19| Justin|
    // +----+-------+
    

RDD与DataFrame的转换

方式一:Using Reflection
    val spark = SparkSession.builder().appName("transferRDD").master("lccal[2]").getOrCreate()import spark.implicits._   # 导入隐式转换val dfRDD = spark.sparkContext.textFile("").map(x =>x.split(",")).map(x=>Person(x(0),x(1).trim.toInt)).toDF() # 将RDD每个元素作用为case classspark.stop()}}case class Person(name:String,age:Int) #定义一个case class,在scala 2.10及之前版本只支持22个字段
方式二:Programmatically Specifying the Schema
    val spark = SparkSession.builder().appName("").master("").getOrCreate()step1: #定义schema信息val schemaString = "name,age"val schemaType = schemaString.split(",").map(x=> StructField(x,StringType,nullable = true))val schema=StructType(schemaType)step2: #将RDD转为rowRDDval RDD = spark.sparkContext.textFile("")val rowRDD = RDD.map(_.split(",")).map(x=>Row(x(0),x(1).trim))step3: #使用rowRDD和schema信息创建dataFrameval RDDdf = spark.createDataFrame(rowRDD,schema)