当前位置: 代码迷 >> 综合 >> pyspark入门系列 - 07 pyspark.sql.GroupedData函数汇总
  详细解决方案

pyspark入门系列 - 07 pyspark.sql.GroupedData函数汇总

热度:123   发布时间:2023-10-11 11:38:08.0

本节来学习pyspark.sql中的Grouped_Data类型的函数。博客中代码基于spark 2.4.4版本。不同版本函数会有不同,详细请参考官方文档。博客案例中用到的数据可以点击此处下载(提取码:h6gg)

GroupedData(jgd,df)

是由DataFrame.groupBy()创建的一组在DataFrame上聚合的方法
pyspark入门系列 - 07 pyspark.sql.GroupedData函数汇总

from pyspark.sql import SparkSession
import pyspark.sql.types as typspark = SparkSession.Builder().master('local').appName('GroupedData').getOrCreate()
input_types = [('Opponent', typ.StringType()),('WinOrLose', typ.StringType()),('HomeAndAway', typ.StringType()),('Hit', typ.IntegerType()),('Shots', typ.IntegerType()),('FieldGoalPercentage', typ.FloatType()),('3PointShooting', typ.FloatType()),('Backboard', typ.IntegerType()),('Assist', typ.IntegerType()),('Score', typ.IntegerType()),]schema = typ.StructType([typ.StructField(e[0], e[1], True) for e in input_types])df = spark.read.csv('../data/dat.csv', header=True, schema=schema)
df.printSchema()
root|-- Opponent: string (nullable = true)|-- WinOrLose: string (nullable = true)|-- HomeAndAway: string (nullable = true)|-- Hit: integer (nullable = true)|-- Shots: integer (nullable = true)|-- FieldGoalPercentage: float (nullable = true)|-- 3PointShooting: float (nullable = true)|-- Backboard: integer (nullable = true)|-- Assist: integer (nullable = true)|-- Score: integer (nullable = true)

?

agg(*exprs)

计算聚合并将结果作为DataFrame返回。

可用的聚合函数可以是:

  • 内置的汇总函数,例如avg,max,min,sum,count
  • 使用pyspark.sql.functions.pandas_udf()创建的集合聚合pandas UDF

如果exprs是从字符串到字符串的单个dict映射,则键是要对其执行聚合的列,而值是聚合函数。

另外,exprs也可以是聚合列表达式的列表

1. 内置函数

grouped_df = df.groupby('WinOrLose')
grouped_df.agg({
    '3PointShooting': 'max', 'Shots': 'avg', 'Assist': 'count', 'Score': 'sum'}).show()
+---------+----------+------------------+-------------------+-------------+
|WinOrLose|sum(Score)|        avg(Shots)|max(3PointShooting)|count(Assist)|
+---------+----------+------------------+-------------------+-------------+
|       负|       109|             21.25|              0.429|            4|
|       胜|       692|21.142857142857142|              0.875|           21|
+---------+----------+------------------+-------------------+-------------+

?

from pyspark.sql import functions as Fgrouped_df.agg(F.max(df['3PointShooting']).alias('3PointShooting_max'), F.mean(df['Shots']).alias('Shots_mean'), F.count(df['Assist']).alias('Assist_count'), F.sum(df['Score']).alias('Score_sum')).show()
+---------+------------------+------------------+------------+---------+
|WinOrLose|3PointShooting_max|        Shots_mean|Assist_count|Score_sum|
+---------+------------------+------------------+------------+---------+
|       负|             0.429|             21.25|           4|      109|
|       胜|             0.875|21.142857142857142|          21|      692|
+---------+------------------+------------------+------------+---------+

?

2. pandas_udf

from pyspark.sql.functions import pandas_udf, PandasUDFType@pandas_udf('int', PandasUDFType.GROUPED_AGG)
def min_score(v):return v.min()grouped_df.agg(min_score(df['Score']).alias('min_score')).show()
+---------+---------+
|WinOrLose|min_score|
+---------+---------+
|       负|       20|
|       胜|       21|
+---------+---------+

?

apply(udf)

pyspark.sql.GroupedData.applyInPandas()的别名。

它需要一个pyspark.sql.functions.pandas_udf(),使用pandas udf映射当前DataFrame的每组,并将结果作为DataFrame返回

注意:Grouped_Data.apply()中的udf函数只能是GROUPED_MAP类型

import pandas as pd
from pyspark.sql.functions import col@pandas_udf(returnType='WinOrLose string , Shots float, Score int' , functionType=PandasUDFType.GROUPED_MAP)
def normalize_udf(pdf):Shots = pdf['Shots']Score = pdf['Score']pdf.assign(Shots_Standard=(Shots - Shots.mean()) / Shots.std())pdf.assign(Score_Standard=(Score - Score.mean()) / Score.std())return pdf df.select('WinOrLose', 'Shots', 'Score').groupby('WinOrLose').apply(normalize_udf).show() 
+---------+-----+-----+
|WinOrLose|Shots|Score|
+---------+-----+-----+
|       负| 20.0|   22|
|       负| 19.0|   20|
|       负| 21.0|   29|
|       负| 25.0|   38|
|       胜| 23.0|   27|
|       胜| 21.0|   27|
|       胜| 19.0|   29|
|       胜| 20.0|   27|
|       胜| 18.0|   27|
|       胜| 23.0|   31|
|       胜| 15.0|   29|
|       胜| 25.0|   56|
|       胜| 21.0|   35|
|       胜| 25.0|   38|
|       胜| 21.0|   26|
|       胜| 22.0|   48|
|       胜| 20.0|   29|
|       胜| 16.0|   21|
|       胜| 27.0|   37|
|       胜| 20.0|   37|
+---------+-----+-----+
only showing top 20 rows

?

pivot(pivot_col, values=None)

旋转当前DataFrame的一列并执行指定的聚合。

  • pivot_col: 用来povit的列名
  • values:值列表,这些值将转换为输出DataFrame中的列
grouped_df.pivot(pivot_col='HomeAndAway').count().show()
+---------+---+---+
|WinOrLose| 主| 客|
+---------+---+---+
|       负|  3|  1|
|       胜|  9| 12|
+---------+---+---+

?

applyInPandas(func, schema)

pyspark.sql.GroupedData.applyInPandas()需要一个Python函数。推荐使用pyspark.sql.GroupedData.applyInPandas() API

使用pandas udf映射当前DataFrame的每个组,并将结果作为DataFrame返回。注意:spark 3.0才有此函数。

  • func:一个Python原生函数,采用pandas.DataFrame并输出pandas.DataFrame。
  • schema: PySpark中的func的返回类型。 该值可以是pyspark.sql.types.DataType对象或DDL格式的类型字符串

avg(*cols)

对每个分组中的每个数值列计算平均值。别名mean()

grouped_df.avg('Hit', 'Assist', 'Score').show()
+---------+------------------+-----------------+-----------------+
|WinOrLose|          avg(Hit)|      avg(Assist)|       avg(Score)|
+---------+------------------+-----------------+-----------------+
|       负|               7.5|              8.5|            27.25|
|       胜|10.238095238095237|9.666666666666666|32.95238095238095|
+---------+------------------+-----------------+-----------------+

?

count()

计算每个分组的记录数

grouped_df.count().show()
+----+-----+
|胜负|count|
+----+-----+
|  负|    4|
|  胜|   21|
+----+-----+

?

min(*cols)

计算每个分组中每个数值列的最小值

grouped_df.min('Hit', 'Assist', 'Score').show()
+---------+--------+-----------+----------+
|WinOrLose|min(Hit)|min(Assist)|min(Score)|
+---------+--------+-----------+----------+
|       负|       6|          7|        20|
|       胜|       6|          3|        21|
+---------+--------+-----------+----------+

?

max(*cols)

计算每个分组中每个数值列的最大值

grouped_df.max('Hit', 'Assist', 'Score').show()
+---------+--------+-----------+----------+
|WinOrLose|max(Hit)|max(Assist)|max(Score)|
+---------+--------+-----------+----------+
|       负|       8|         11|        38|
|       胜|      19|         17|        56|
+---------+--------+-----------+----------+

?

mean(*cols)

等价于avg()

grouped_df.mean('Hit', 'Assist', 'Score').show()
+---------+------------------+-----------------+-----------------+
|WinOrLose|          avg(Hit)|      avg(Assist)|       avg(Score)|
+---------+------------------+-----------------+-----------------+
|       负|               7.5|              8.5|            27.25|
|       胜|10.238095238095237|9.666666666666666|32.95238095238095|
+---------+------------------+-----------------+-----------------+

?

sum(*cols)

计算每个分组中每个数值列的和

grouped_df.sum('Hit', 'Assist', 'Score').show()
+---------+--------+-----------+----------+
|WinOrLose|sum(Hit)|sum(Assist)|sum(Score)|
+---------+--------+-----------+----------+
|       负|      30|         34|       109|
|       胜|     215|        203|       692|
+---------+--------+-----------+----------+

文末致敬官方文档:http://spark.apache.org/docs/2.4.4/api/python/pyspark.sql.html#pyspark.sql.GroupedData
?