当前位置: 代码迷 >> SQL >> spark sql根本使用方法介绍
  详细解决方案

spark sql根本使用方法介绍

热度:307   发布时间:2016-05-05 09:48:26.0
spark sql基本使用方法介绍

spark中可以通过spark sql 直接查询hive或impala中的数据,

?

一、启动方法

/data/spark-1.4.0-bin-cdh4/bin/spark-sql --master spark://master:7077 --total-executor-cores 10? --executor-memory 1g --executor-cores? 2

?

注:/data/spark-1.4.0-bin-cdh4/为spark的安装路径

?

/data/spark-1.4.0-bin-cdh4/bin/spark-sql –help 查看启动选项

?

--master MASTER_URL?????????? 指定master url

--executor-memory MEM????????? 每个executor的内存,默认为1G

--total-executor-cores NUM?????? 所有executor的总核数

-e <quoted-query-string>?????????? 直接执行查询SQL

?

-f <filename>????????????????????????????? 以文件方式批量执行SQL

?

二、Spark sql对hive支持的功能

?

1、查询语句:SELECT GROUP BY ORDER BY CLUSTER BY SORT BY

2、hive操作运算:

???? 1) 关系运算:=?==,?<>,?<,?>,?>=,?<=

???? 2) 算术运算:+,?-,?*,?/,?%

???? 3) 逻辑运算:AND,?&&,?OR,?||

???? 4) 复杂的数据结构

???? 5) 数学函数:(sign,?ln,?cos, etc)

???? 6) 字符串函数:

3、 UDF

4、 UDAF

?

5、 用户定义的序列化格式

6、join操作:JOIN???? {LEFT|RIGHT|FULL} OUTER JOIN???? LEFT SEMI JOIN??? CROSS JOIN

7、 unions操作:

8、 子查询: SELECT col FROM ( SELECT a + b AS col from t1) t2

9、Sampling?

10、 Explain

11、? 分区表

12、 视图

13、? hive ddl功能:CREATE TABLE、CREATE TABLE AS SELECT、ALTER TABLE?????

?

14、 支持的数据类型:TINYINT? SMALLINT INT BIGINT BOOLEAN FLOAT DOUBLE STRING BINARY TIMESTAMPDATE? ARRAY MAP STRUCT

?

?

三、Spark sql 在客户端编程方式进行查询数据

1、启动spark-shell

./spark-shell --master spark://master:7077 --total-executor-cores 10? --executor-memory 1g? --executor-cores? 2

2、编写程序

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val df = sqlContext.read.json("../examples/src/main/resources/people.json")

查看所有数据:df.show()

查看表结构:df.printSchema()

只看name列:df.select("name").show()

对数据运算:df.select(df("name"), df("age") + 1).show()

过滤数据:df.filter(df("age") > 21).show()

?

分组统计:df.groupBy("age").count().show()

?

1、查询txt数据

import sqlContext.implicits._

case class Person(name: String, age: Int)

val people = sc.textFile("../examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()

people.registerTempTable("people")

val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")

2、parquet文件

val df = sqlContext.read.load("../examples/src/main/resources/users.parquet")

3、hdfs文件

?

val df = sqlContext.read.load("hdfs://namenode.hadoop:9000/user/hive/warehouse/spark_test.db/test_parquet/part-r-00001.gz.parquet")

4、保存查询结果数据

val df = sqlContext.read.load("../examples/src/main/resources/users.parquet")

?

df.select("name", "favorite_color").write.save("namesAndFavColors.parquet“)

?

四、Spark sql性能调优

?

缓存数据表:sqlContext.cacheTable("tableName")

?

取消缓存表:sqlContext.uncacheTable("tableName")

?

?

spark.sql.inMemoryColumnarStorage.compressedtrue 当设置为true时,Spark SQL将为基于数据统计信息的每列自动选择一个压缩算法。

spark.sql.inMemoryColumnarStorage.batchSize 10000 柱状缓存的批数据大小。更大的批数据可以提高内存的利用率以及压缩效率,但有OOMs的风险

?

  相关解决方案