我有以下Python测试代码(ALS.train
的参数在其他地方定义):
r1 = (2, 1) r2 = (3, 1) test = sc.parallelize([r1, r2]) model = ALS.train(ratings, rank, numIter, lmbda) predictions = model.predictAll(test) print test.take(1) print predictions.count() print predictions
这个代码是有效的,因为predictions变量的计数为1,并且输出如下:
[(2, 1)]1ParallelCollectionRDD[2691] at parallelize at PythonRDD.scala:423
然而,当我尝试使用自己创建的RDD
时,使用以下代码,似乎不再有效:
model = ALS.train(ratings, rank, numIter, lmbda)validation_data = validation.map(lambda xs: tuple(int(x) for x in xs))predictions = model.predictAll(validation_data)print validation_data.take(1)print predictions.count()print validation_data
输出如下:
[(61, 3864)]0PythonRDD[4018] at RDD at PythonRDD.scala:43
如你所见,当传递映射后的RDD
时,predictAll
返回空。输入的值格式是相同的。我能看到的唯一明显的区别是第一个例子使用了parallelize,产生了ParallelCollectionRDD
,而第二个例子只是使用了map,产生了PythonRDD
。predictAll
是否只有在传递特定类型的RDD
时才有效?如果是的话,是否有可能在RDD
类型之间转换?我不确定如何使其工作。
回答:
MatrixFactorizationMode.predictAll
可能返回一个比输入项少的RDD的两个基本条件如下:
- 训练集中缺少用户。
- 训练集中缺少产品。
你可以轻松重现这种行为,并检查它与RDD的创建方式无关。首先让我们使用示例数据构建一个模型:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Ratingdef parse(s): x, y, z = s.split(",") return Rating(int(x), int(y), float(z))ratings = (sc.textFile("data/mllib/als/test.data") .map(parse) .union(sc.parallelize([Rating(1, 5, 4.0)])))model = ALS.train(ratings, 10, 10)
接下来让我们查看训练数据中存在的产品和用户:
set(ratings.map(lambda r: r.product).collect())## {1, 2, 3, 4, 5}set(ratings.map(lambda r: r.user).collect())## {1, 2, 3, 4}
现在让我们创建测试数据并检查预测结果:
valid_test = sc.parallelize([(2, 5), (1, 4), (3, 5)])valid_test## ParallelCollectionRDD[434] at parallelize at PythonRDD.scala:423model.predictAll(valid_test).count()## 3
到目前为止一切顺利。接下来让我们使用与你的代码相同的逻辑进行映射:
valid_test_ = valid_test.map(lambda xs: tuple(int(x) for x in xs))valid_test_## PythonRDD[497] at RDD at PythonRDD.scala:43model.predictAll(valid_test_).count()## 3
仍然正常。接下来让我们创建无效数据并重复实验:
invalid_test = sc.parallelize([ (2, 6), # 训练数据中没有产品 (6, 1) # 训练数据中没有用户])invalid_test ## ParallelCollectionRDD[500] at parallelize at PythonRDD.scala:423model.predictAll(invalid_test).count()## 0 invalid_test_ = invalid_test.map(lambda xs: tuple(int(x) for x in xs))model.predictAll(invalid_test_).count()## 0
如预期的那样,对于无效输入没有预测结果。
最后,你可以通过使用完全独立于Python代码的ML模型来确认确实如此:
from pyspark.ml.recommendation import ALS as MLALSmodel_ml = MLALS(rank=10, maxIter=10).fit( ratings.toDF(["user", "item", "rating"]))model_ml.transform((valid_test + invalid_test).toDF(["user", "item"])).show()## +----+----+----------+## |user|item|prediction|## +----+----+----------+## | 6| 1| NaN|## | 1| 4| 1.0184212|## | 2| 5| 4.0041084|## | 3| 5|0.40498763|## | 2| 6| NaN|## +----+----+----------+
如你所见,训练数据中没有对应的用户/产品意味着没有预测结果。