本节来学习pyspark.sql中的Grouped_Data类型的函数。博客中代码基于spark 2.4.4版本。不同版本函数会有不同,详细请参考官方文档。博客案例中用到的数据可以点击此处下载(提取码:h6gg)
GroupedData(jgd,df)
是由DataFrame.groupBy()创建的一组在DataFrame上聚合的方法
from pyspark.sql import SparkSession
import pyspark.sql.types as typspark = SparkSession.Builder().master('local').appName('GroupedData').getOrCreate()
input_types = [('Opponent', typ.StringType()),('WinOrLose', typ.StringType()),('HomeAndAway', typ.StringType()),('Hit', typ.IntegerType()),('Shots', typ.IntegerType()),('FieldGoalPercentage', typ.FloatType()),('3PointShooting', typ.FloatType()),('Backboard', typ.IntegerType()),('Assist', typ.IntegerType()),('Score', typ.IntegerType()),]schema = typ.StructType([typ.StructField(e[0], e[1], True) for e in input_types])df = spark.read.csv('../data/dat.csv', header=True, schema=schema)
df.printSchema()
root|-- Opponent: string (nullable = true)|-- WinOrLose: string (nullable = true)|-- HomeAndAway: string (nullable = true)|-- Hit: integer (nullable = true)|-- Shots: integer (nullable = true)|-- FieldGoalPercentage: float (nullable = true)|-- 3PointShooting: float (nullable = true)|-- Backboard: integer (nullable = true)|-- Assist: integer (nullable = true)|-- Score: integer (nullable = true)
?
agg(*exprs)
计算聚合并将结果作为DataFrame返回。
可用的聚合函数可以是:
- 内置的汇总函数,例如avg,max,min,sum,count
- 使用pyspark.sql.functions.pandas_udf()创建的集合聚合pandas UDF
如果exprs是从字符串到字符串的单个dict映射,则键是要对其执行聚合的列,而值是聚合函数。
另外,exprs也可以是聚合列表达式的列表
1. 内置函数
grouped_df = df.groupby('WinOrLose')
grouped_df.agg({
'3PointShooting': 'max', 'Shots': 'avg', 'Assist': 'count', 'Score': 'sum'}).show()
+---------+----------+------------------+-------------------+-------------+
|WinOrLose|sum(Score)| avg(Shots)|max(3PointShooting)|count(Assist)|
+---------+----------+------------------+-------------------+-------------+
| 负| 109| 21.25| 0.429| 4|
| 胜| 692|21.142857142857142| 0.875| 21|
+---------+----------+------------------+-------------------+-------------+
?
from pyspark.sql import functions as Fgrouped_df.agg(F.max(df['3PointShooting']).alias('3PointShooting_max'), F.mean(df['Shots']).alias('Shots_mean'), F.count(df['Assist']).alias('Assist_count'), F.sum(df['Score']).alias('Score_sum')).show()
+---------+------------------+------------------+------------+---------+
|WinOrLose|3PointShooting_max| Shots_mean|Assist_count|Score_sum|
+---------+------------------+------------------+------------+---------+
| 负| 0.429| 21.25| 4| 109|
| 胜| 0.875|21.142857142857142| 21| 692|
+---------+------------------+------------------+------------+---------+
?
2. pandas_udf
from pyspark.sql.functions import pandas_udf, PandasUDFType@pandas_udf('int', PandasUDFType.GROUPED_AGG)
def min_score(v):return v.min()grouped_df.agg(min_score(df['Score']).alias('min_score')).show()
+---------+---------+
|WinOrLose|min_score|
+---------+---------+
| 负| 20|
| 胜| 21|
+---------+---------+
?
apply(udf)
pyspark.sql.GroupedData.applyInPandas()的别名。
它需要一个pyspark.sql.functions.pandas_udf(),使用pandas udf映射当前DataFrame的每组,并将结果作为DataFrame返回
注意:Grouped_Data.apply()中的udf函数只能是GROUPED_MAP类型
import pandas as pd
from pyspark.sql.functions import col@pandas_udf(returnType='WinOrLose string , Shots float, Score int' , functionType=PandasUDFType.GROUPED_MAP)
def normalize_udf(pdf):Shots = pdf['Shots']Score = pdf['Score']pdf.assign(Shots_Standard=(Shots - Shots.mean()) / Shots.std())pdf.assign(Score_Standard=(Score - Score.mean()) / Score.std())return pdf df.select('WinOrLose', 'Shots', 'Score').groupby('WinOrLose').apply(normalize_udf).show()
+---------+-----+-----+
|WinOrLose|Shots|Score|
+---------+-----+-----+
| 负| 20.0| 22|
| 负| 19.0| 20|
| 负| 21.0| 29|
| 负| 25.0| 38|
| 胜| 23.0| 27|
| 胜| 21.0| 27|
| 胜| 19.0| 29|
| 胜| 20.0| 27|
| 胜| 18.0| 27|
| 胜| 23.0| 31|
| 胜| 15.0| 29|
| 胜| 25.0| 56|
| 胜| 21.0| 35|
| 胜| 25.0| 38|
| 胜| 21.0| 26|
| 胜| 22.0| 48|
| 胜| 20.0| 29|
| 胜| 16.0| 21|
| 胜| 27.0| 37|
| 胜| 20.0| 37|
+---------+-----+-----+
only showing top 20 rows
?
pivot(pivot_col, values=None)
旋转当前DataFrame的一列并执行指定的聚合。
- pivot_col: 用来povit的列名
- values:值列表,这些值将转换为输出DataFrame中的列
grouped_df.pivot(pivot_col='HomeAndAway').count().show()
+---------+---+---+
|WinOrLose| 主| 客|
+---------+---+---+
| 负| 3| 1|
| 胜| 9| 12|
+---------+---+---+
?
applyInPandas(func, schema)
pyspark.sql.GroupedData.applyInPandas()需要一个Python函数。推荐使用pyspark.sql.GroupedData.applyInPandas() API
使用pandas udf映射当前DataFrame的每个组,并将结果作为DataFrame返回。注意:spark 3.0才有此函数。
- func:一个Python原生函数,采用pandas.DataFrame并输出pandas.DataFrame。
- schema: PySpark中的func的返回类型。 该值可以是pyspark.sql.types.DataType对象或DDL格式的类型字符串
avg(*cols)
对每个分组中的每个数值列计算平均值。别名mean()
grouped_df.avg('Hit', 'Assist', 'Score').show()
+---------+------------------+-----------------+-----------------+
|WinOrLose| avg(Hit)| avg(Assist)| avg(Score)|
+---------+------------------+-----------------+-----------------+
| 负| 7.5| 8.5| 27.25|
| 胜|10.238095238095237|9.666666666666666|32.95238095238095|
+---------+------------------+-----------------+-----------------+
?
count()
计算每个分组的记录数
grouped_df.count().show()
+----+-----+
|胜负|count|
+----+-----+
| 负| 4|
| 胜| 21|
+----+-----+
?
min(*cols)
计算每个分组中每个数值列的最小值
grouped_df.min('Hit', 'Assist', 'Score').show()
+---------+--------+-----------+----------+
|WinOrLose|min(Hit)|min(Assist)|min(Score)|
+---------+--------+-----------+----------+
| 负| 6| 7| 20|
| 胜| 6| 3| 21|
+---------+--------+-----------+----------+
?
max(*cols)
计算每个分组中每个数值列的最大值
grouped_df.max('Hit', 'Assist', 'Score').show()
+---------+--------+-----------+----------+
|WinOrLose|max(Hit)|max(Assist)|max(Score)|
+---------+--------+-----------+----------+
| 负| 8| 11| 38|
| 胜| 19| 17| 56|
+---------+--------+-----------+----------+
?
mean(*cols)
等价于avg()
grouped_df.mean('Hit', 'Assist', 'Score').show()
+---------+------------------+-----------------+-----------------+
|WinOrLose| avg(Hit)| avg(Assist)| avg(Score)|
+---------+------------------+-----------------+-----------------+
| 负| 7.5| 8.5| 27.25|
| 胜|10.238095238095237|9.666666666666666|32.95238095238095|
+---------+------------------+-----------------+-----------------+
?
sum(*cols)
计算每个分组中每个数值列的和
grouped_df.sum('Hit', 'Assist', 'Score').show()
+---------+--------+-----------+----------+
|WinOrLose|sum(Hit)|sum(Assist)|sum(Score)|
+---------+--------+-----------+----------+
| 负| 30| 34| 109|
| 胜| 215| 203| 692|
+---------+--------+-----------+----------+
文末致敬官方文档:http://spark.apache.org/docs/2.4.4/api/python/pyspark.sql.html#pyspark.sql.GroupedData
?