当前位置: 代码迷 >> 综合 >> 【Pyspark】Dataframe添加新的一列
  详细解决方案

【Pyspark】Dataframe添加新的一列

热度:73   发布时间:2024-01-26 09:31:24.0

1. lit 添加常量 字符串

若需要添加一列固定值,比如地名、邮编、标号、字符串之类的,可以直接使用lit 添加常量

Eg: 添加字符串

import pyspark.sql.functions as Fd7=d61.withColumn('line_results',F.lit(string_line))

Eg: 添加常量10

from pyspark.sql.functions import litdf.withColumn('new_column', lit(10))

 

2. withColumn + UDF 添加函数处理后的结果

另一种方法是利用UDF(user defined function)模块。

 

通过UDF函数对输入进行处理后,将输出添加到原始df中

eg :

df_spark_route_info.withColumn("route_detail_new",rearrange_routedetail(df["list_string"]))

其中,rearrange_routedetail为定义好的udf函数,输入为df["list_string"], 处理后输出为route_detail_new,并以route_detail_new为列名被添加在df_spark_route_info 中

 

eg:

https://www.jianshu.com/p/0a5d7ca2b575

https://www.jianshu.com/p/bded081b5350

除了withColumn方法,还可以利用spark的udf模块添加新的列。在本例中,还需要添加相应的时间列,此时withColumn方法并不适用,需要导入udf方法,该方法有两个参数,分别为自定义的函数名及返回值类型。

global idx
idx = 0
date = gettime()
def set_date(x):global idx  # 将idx设置为全局变量if x is not None:idx += 1return date[idx - 1]
index = udf(set_date, StringType())
yHat = yHat.withColumn("date", index(yHat["pm25"]))

 

3. withColumn 添加简单加减乘除等结果

https://www.jianshu.com/p/0a5d7ca2b575

常见的一种方法是调用dataframe的方法,但是该方法存在一定的限制,即新添加的列只能根据现有列转换 eg 加减乘除 得到;

Eg:

yHat = yHat.withColumn("pm25", yHat["pm25"]*(maxValue - minValue) + minValue)
yHat = yHat.withColumn("pm10", yHat["pm25"] + 10)
yHat = yHat.withColumn("CO", yHat["pm25"] + 20)
yHat = yHat.withColumn("NO2", yHat["pm25"] + 30)
yHat = yHat.withColumn("NO", yHat["pm25"] + 40)
yHat = yHat.withColumn("SO2", yHat["pm25"] + 50)

 

其只能根据已经存在的列添加, 不好用

https://codeday.me/bug/20180127/122213.html

  相关解决方案