问题描述
我有一个pyspark.sql.dataframe,其中每一行都是一篇新闻文章。 然后我有一个RDD代表每篇文章中包含的单词。 我想将单词的RDD添加为名为“words”的列到我的新文章的数据框中。 我试过了
df.withColumn('words', words_rdd )
但我得到了错误
AssertionError: col should be Column
DataFrame看起来像这样
Articles
the cat and dog ran
we went to the park
today it will rain
但我有3k新闻文章。
我应用了一个函数来清理文本,例如删除停用词,我有一个如下所示的RDD:
[[cat, dog, ran],[we, went, park],[today, will, rain]]
我试图让我的Dataframe看起来像这样:
Articles Words
the cat and dog ran [cat, dog, ran]
we went to the park [we, went, park]
today it will rain [today, will, rain]
1楼
免责声明 :
Spark DataFrame
一般没有严格定义的顺序。
使用风险由您自己承担。
将索引添加到现有DataFrame
:
from pyspark.sql.types import *
df_index = spark.createDataFrame(
df.rdd.zipWithIndex(),
StructType([StructField("data", df.schema), StructField("id", LongType())])
)
将索引添加到RDD
并转换为DataFrame
:
words_df = spark.createDataFrame(
words_rdd.zipWithIndex(),
StructType([
StructField("words", ArrayType(StringType())),
StructField("id", LongType())
])
)
加入并选择必填字段:
df_index.join(words_df, "id").select("data.*", "words")
警告
有不同的解决方案,可能在特定情况下有效,但不保证性能和/或正确性。 这些包括:
-
使用
monotonically_increasing_id
作为join
键 - 通常情况下不正确。 -
使用
row_number()
窗口函数作为连接键 - 不可接受的性能影响,如果没有定义特定的顺序,通常不正确。 -
在
RDDs
上使用zip
- 当且仅当两个结构具有相同的数据分布时才能工作(在这种情况下应该有效)。
注意 :
在这种特定情况下,您不应该需要RDD
。
pyspark.ml.feature
提供各种Transformers
,应该适合你。
from pyspark.ml.feature import *
from pyspark.ml import Pipeline
df = spark.createDataFrame(
["the cat and dog ran", "we went to the park", "today it will rain"],
"string"
).toDF("Articles")
Pipeline(stages=[
RegexTokenizer(inputCol="Articles", outputCol="Tokens"),
StopWordsRemover(inputCol="Tokens", outputCol="Words")
]).fit(df).transform(df).show()
# +-------------------+--------------------+---------------+
# | Articles| Tokens| Words|
# +-------------------+--------------------+---------------+
# |the cat and dog ran|[the, cat, and, d...|[cat, dog, ran]|
# |we went to the park|[we, went, to, th...| [went, park]|
# | today it will rain|[today, it, will,...| [today, rain]|
# +-------------------+--------------------+---------------+
可以使用StopWordsRemover
stopWords
参数提供停用词列表,例如:
StopWordsRemover(
inputCol="Tokens",
outputCol="Words",
stopWords=["the", "and", "we", "to", "it"]
)
2楼
为什么要将rdd加入到数据框中,我宁愿直接从“Articles”创建一个新列。 有多种方法可以做到这一点,这是我的5美分:
from pyspark.sql import Row
from pyspark.sql.context import SQLContext
sqlCtx = SQLContext(sc) # sc is the sparkcontext
x = [Row(Articles='the cat and dog ran'),Row(Articles='we went to the park'),Row(Articles='today it will rain')]
df = sqlCtx.createDataFrame(x)
df2 = df.map(lambda x:tuple([x.Articles,x.Articles.split(' ')])).toDF(['Articles','words'])
df2.show()
您将获得以下输出:
Articles words
the cat and dog ran [the, cat, and, dog, ran]
we went to the park [we, went, to, the, park]
today it will rain [today, it, will, rain]
如果你想要实现其他目标,请告诉我。
3楼
一个简单的方法,但有效的是使用 。 您可以:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
df = spark.createDataFrame(["the cat and dog ran", "we went to the park", "today it will rain", None],
"string" ).toDF("Articles")
split_words = udf(lambda x : x.split(' ') if x is not None else x, StringType())
df = df.withColumn('Words', split_words(df['Articles']))
df.show(10,False)
>>
+-------------------+-------------------------+
|Articles |Words |
+-------------------+-------------------------+
|the cat and dog ran|[the, cat, and, dog, ran]|
|we went to the park|[we, went, to, the, park]|
|today it will rain |[today, it, will, rain] |
|null |null |
+-------------------+-------------------------+
我添加了检查无,因为通常在您的数据中有坏行。 您可以在拆分之后或之前使用dropna轻松放下它们。
但在我看来,如果您想将此作为文本分析的准备任务,那么建立管道可能符合您的最佳利益,因为@ user9613318在他的回答中建议
4楼
rdd1 = spark.sparkContext.parallelize([1, 2, 3, 5])
# make some transformation on rdd1:
rdd2 = rdd.map(lambda n: True if n % 2 else False)
# Append each row in rdd2 to those in rdd1.
rdd1.zip(rdd2).collect()