我在Spark中有两个数据框,分别是train和test。两者中都有一个分类列,假设为Product_ID
,我想做的就是,对于那些在test中但不在train中的类别,将其值设为-1
。为此,我首先在p_not_in_test
中找到了该列的不同类别。但我无法继续进行。该怎么做呢?
p_not_in_test = test.select('Product_ID').subtract(train.select('Product_ID'))p_not_in_test = p_not_in_test.distinct()
此致
回答:
这是一个可复现的示例,首先我们创建虚拟数据:
test = sc.parallelize([("ID1", 1,5),("ID2", 2,4), ("ID3", 5,8),("ID4", 9,0), ("ID5", 0,3)]).toDF(["PRODUCT_ID", "val1", "val2"])train = sc.parallelize([("ID1", 4,7),("ID3", 1,4), ("ID5", 9,2)]).toDF(["PRODUCT_ID", "val1", "val2"])
现在我们需要扩展p_not_in_test
的定义,以便得到一个列表作为输出:
p_not_in_test = (test.select('PRODUCT_ID') .subtract(train.select('PRODUCT_ID')) .rdd.map(lambda x: x[0]).collect())
最后,我们可以创建一个udf
,它将在train
中不存在的每个ID
前添加"-1"
。
from pyspark.sql.types import StringTypefrom pyspark.sql.functions import udfaddString = udf(lambda x: '-1 ' + x if x in p_not_in_test else x, StringType())test.withColumn("NEW_ID",addString(test["PRODUCT_ID"])).show()+----------+----+----+------+|PRODUCT_ID|val1|val2|NEW_ID|+----------+----+----+------+| ID1| 1| 5| ID1|| ID2| 2| 4|-1 ID2|| ID3| 5| 8| ID3|| ID4| 9| 0|-1 ID4|| ID5| 0| 3| ID5|+----------+----+----+------+