简介
- SparkSQL可以直接使用SQL的方式处理结构化数据,也可以通过DataFrame(spark1.3)和Dataset(spark1.6) API 使用编程的方式处理结构化数据,本文只介绍以DataFrame API的方式编程,至于DataFrame 和Daraset有何不同,我们暂时将二者先理解为一个概念,DataFrame API支持多种编程语言,如Java,Scala,Python,R,这就意味着开发人员可以采用自己熟悉的语言编程。
SparkSession
- Spark2.0中引入了SparkSession的概念,它为用户提供了一个统一的切入点来使用Spark的各项功能,用户可以使用DataFrame和Dataset的各种API,在Spark2.0版本之前,创建一个Spark应用程序就必须要使用一个SparkConf和SparkContext,SparkSession封装了SparkConf、SparkContext和SQLContext,因此创建一个以SparkSQL编程的应用程序,只需要创建一个SparkSession。
创建SparkSession
import org.apache.spark.sql.SparkSessionval spark = SparkSession .builder() .appName("Spark SQL basic example") .config("spark.some.config.option", "some-value") .enableHiveSupport() #支持hive的一些特性,如udf,序列化与反序列化 .getOrCreate()// For implicit conversions like converting RDDs to DataFrames import spark.implicits._ #使用隐式转换
DataFrame
- DataFrame 可以理解为具有Schema结构的RDD,并采用列式存储,他相对于RDD,可以从数据中读取到更多信息,因此可以做更多的优化.
- 那么怎样将数据转成一个DataFrame呢? DataFrame API 对具有Schema信息的数据提供了format()和load()方法可以将不同格式和内部数据源的数据加载为DataFrame;当然,对外部数据源,DataFrame也提供了很好的支持,并且可以将非结构化数据以RDD的方式进行转换成DataFrame。
eg:读取json文件
val peopledf=spark.read.format("json").load("file:///data/people.json")peopledf.show #这里的show方法默认显示前20行数据+----+-------+| age| name|+----+-------+|null|Michael|| 30| Andy|| 19| Justin|+----+-------+第一句也可以简写为:val peopledf=spark.read.json("file:///data/people.json")
printSchema()
peopledf.printSchema() root|-- age: long (nullable = true)|-- name: string (nullable = true)列出了他的Schema结构(Tree)
select()
peopledf.select("name").show +-------+ | name| +-------+ |Michael| | Andy| | Justin| +-------+ peopledf.select("name","age").show +-------+----+ | name| age| +-------+----+ |Michael|null| | Andy| 30| | Justin| 19| +-------+----+ peopledf.select(df.col("name")).show +-------+ | name| +-------+ |Michael| | Andy| | Justin| +-------+ peopledf.select('name).show # 此方式需要import spark.implicits_ 导入隐式转换 +-------+ | name| +-------+ |Michael| | Andy| | Justin| +-------+ peopledf.select($"name",$"age"+10).show +-------+----------+ | name|(age + 10)| +-------+----------+ |Michael| null| | Andy| 40| | Justin| 29| +-------+----------+
filter()
peopledf.filter($"age">21).show +---+----+ |age|name| +---+----+ | 30|Andy| +---+----+
groupBy()
peopledf.groupBy("age").count().show +----+-----+ | age|count| +----+-----+ | 19| 1| |null| 1| | 30| 1| +----+-----+
show()
show(num,truncate) 传入俩个参数,第一个控制显示行数,第二个控制是否显示全部信息
在DataFrame中使用SQL编程
SparkSession中的sql()函数可以让应用程序以编程的方式运行SQL查询语句,让结果返回一个DataFrame。
// Register the DataFrame as a SQL temporary view peopledf.createOrReplaceTempView("people") val sqldf = spark.sql("select * from people") sqlDF.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
RDD与DataFrame的转换
方式一:Using Reflection
val spark = SparkSession.builder().appName("transferRDD").master("lccal[2]").getOrCreate()import spark.implicits._ # 导入隐式转换val dfRDD = spark.sparkContext.textFile("").map(x =>x.split(",")).map(x=>Person(x(0),x(1).trim.toInt)).toDF() # 将RDD每个元素作用为case classspark.stop()}}case class Person(name:String,age:Int) #定义一个case class,在scala 2.10及之前版本只支持22个字段
方式二:Programmatically Specifying the Schema
val spark = SparkSession.builder().appName("").master("").getOrCreate()step1: #定义schema信息val schemaString = "name,age"val schemaType = schemaString.split(",").map(x=> StructField(x,StringType,nullable = true))val schema=StructType(schemaType)step2: #将RDD转为rowRDDval RDD = spark.sparkContext.textFile("")val rowRDD = RDD.map(_.split(",")).map(x=>Row(x(0),x(1).trim))step3: #使用rowRDD和schema信息创建dataFrameval RDDdf = spark.createDataFrame(rowRDD,schema)