当前位置: 代码迷 >> 综合 >> pyspark入门系列 - 06 pyspark.sql.functions.pandas_udf使用教程
  详细解决方案

pyspark入门系列 - 06 pyspark.sql.functions.pandas_udf使用教程

热度:73   发布时间:2023-10-11 11:39:17.0

本节来学习pyspark.sql.functions中的pandas_udf函数。博客中代码基于spark 2.4.4版本。不同版本函数会有不同,详细请参考官方文档。博客案例中用到的数据可以点击此处下载(提取码:2bd5)

pyspark.sql.functions.pandas_udf(f=None, returnType=None, functionType=None)

pandas_udf是用户定义的函数,由Spark使用Arrow来传输数据,并使用Pandas来处理数据,从而实现矢量化操作。使用pandas_udf,可以方便的在PySpark和Pandas之间进行互操作,并且保证性能;其核心思想是将Apache Arrow作为序列化格式。

Pandas UDF通常表现为常规的PySpark函数API。

  • f: 用户定义的函数;
  • returnType: 用户自定义函数的返回值类型,该值可以是pyspark.sql.types.DataType对象或DDL格式的类型字符串
  • functionType: pyspark.sql.functions.PandasUDFType中的枚举值。 默认值:SCALAR.存在此参数是为了兼容性。 鼓励使用Python类型。

Pandas_UDF类型

目前,有三种类型的Pandas_UDF,分别是Scalar(标量映射),GROUPED_MAP(分组映射),GROUPED_AGG(分组聚合),对应的输入输出格式如下表

pandas_udf类型 输入 输出
SCALAR 一个或多个pd.Series 一个pd.Series
GROUPED_MAP pd.DataFrame pd.DataFrame
GROUPED_AGG 一个或多个pd.Series SCALAR
# 在学习之前先导入必要的包和数据
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, FloatType
import pandas as pd
from pyspark.sql.functions import pandas_udf, udfspark = SparkSession.Builder().master('local').appName('GroupedData').getOrCreate()
df = spark.read.csv('../data/data.csv', header=True)df = df.withColumn('投篮数', df['投篮数'].astype(IntegerType()))
df = df.withColumn('命中', df['命中'].astype(IntegerType()))
df = df.withColumn('投篮命中率', df['投篮命中率'].astype(FloatType()))
df = df.withColumn('3分命中率', df['3分命中率'].astype(FloatType()))
df = df.withColumn('篮板', df['篮板'].astype(IntegerType()))
df = df.withColumn('助攻', df['助攻'].astype(IntegerType()))
df = df.withColumn('得分', df['得分'].astype(IntegerType()))

使用方式

使用pandas_udf作为装饰器来定义Pandas UDF

注意:在spark 3.0之前,类型提示都应使用functionType指定pandas UDF类型。

1. Scalar(标量类型)

Scalar UDF定义了一个转换,函数输入一个或多个pd.Series,输出一个pd.Series,函数的输出和输入有相同的长度

Scalar Pandas UDF用于向量化标量操作。常常与select和withColumn等函数一起使用。其中调用的Python函数需要使用pandas.Series作为输入并返回一个具有相同长度的pandas.Series。具体执行流程是,Spark将列分成批(每批进行向量化计算的数据量由 spark.sql.execution.arrow.maxRecordsPerBatch 参数控制,默认为10000条),并将每个批作为数据的子集进行函数的调用,进而执行panda UDF,最后将结果连接在一起。

from pyspark.sql.functions import PandasUDFType@pandas_udf(returnType=IntegerType(), functionType=PandasUDFType.SCALAR)
def total_shoot(x, y):return x + ydf.withColumn("篮板+助攻", total_shoot("篮板", "助攻")).show(2)
+---+----+----+------+----+------+----------+---------+----+----+----+---------+
|_c0|对手|胜负|主客场|命中|投篮数|投篮命中率|3分命中率|篮板|助攻|得分|篮板+助攻|
+---+----+----+------+----+------+----------+---------+----+----+----+---------+
|  0|勇士|  胜|    客|  10|    23|     0.435|    0.444|   6|  11|  27|       17|
|  1|国王|  胜|    客|   8|    21|     0.381|    0.286|   3|   9|  27|       12|
+---+----+----+------+----+------+----------+---------+----+----+----+---------+
only showing top 2 rows

?

2. Grouped Map(分组映射)

GROUPED_MAP UDF定义了转换:pd.DataFrame -> pd.DataFrame,returnType使用StructType描述返回值的pd.DataFrame的模式。
Grouped Map(分组映射)panda_udf与groupBy().apply()一起使用,后者实现了“split-apply-combine”模式。
“split-apply-combine”包括三个步骤:

  • 使用DataFrame.groupBy将数据分成多个组。
  • 对每个分组应用一个函数。函数的输入和输出都是pandas.DataFrame。输入数据包含每个组的所有行和列。
  • 将结果合并到一个新的DataFrame中。

要使用groupBy().apply(),需要定义以下内容:

  • 定义每个分组的Python计算函数,这里可以使用pandas包或者Python自带方法。
  • 一个StructType对象或字符串,它定义输出DataFrame的格式,包括输出特征以及特征类型。
    需要注意的是,StructType对象中的Dataframe特征顺序需要与分组中的Python计算函数返回特征顺序保持一致。

此外,在应用该函数之前,分组中的所有数据都会加载到内存,这可能导致内存不足抛出异常。

from pyspark.sql import functions as F@pandas_udf('win_flag int, score double', functionType=PandasUDFType.GROUPED_MAP)
def normalize(pdf):v = pdf['score']return pdf.assign(score=(v - v.mean() / v.std()))# 实验中中文列名会报错,所以尝试英文列名
df = df.withColumn('win_flag', F.when(df['胜负'] == '胜', 1).otherwise(0))
df = df.withColumn('score', df['得分'])
df.select('win_flag', 'score').groupby('win_flag').apply(normalize).show(3)
+--------+-----------------+
|win_flag|            score|
+--------+-----------------+
|       1|23.21879940059463|
|       1|23.21879940059463|
|       1|25.21879940059463|
+--------+-----------------+
only showing top 3 rows

?

3. GROUPED_AGG

GROUPED_AGG定义了一个或多个pandas.Series -> 一个scalar,scalar的返回值类型(returnType)应该是原始数据类型

Grouped aggregate Panda UDF类似于Spark聚合函数。Grouped aggregate Panda UDF常常与groupBy().agg()和pyspark.sql.window一起使用。 需要注意的是,这种类型的UDF不支持部分聚合,组或窗口的所有数据都将加载到内存中。此外,目前只支持Grouped aggregate Pandas UDFs的无界窗口。 下面的例子展示了如何使用这种类型的UDF来计算groupBy和窗口操作的平均值

# 统计 胜 和 负 的平均分
@pandas_udf('int', PandasUDFType.GROUPED_AGG)
def count_num(v):return v.mean()df.groupby("胜负").agg(count_num(df['得分']).alias('avg_score')).show(2)
+----+---------+
|胜负|avg_score|
+----+---------+
|  负|       27|
|  胜|       32|
+----+---------+

?

文末致敬官方文档:http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf

  相关解决方案