我有一个DataFrame
,其中包含用户对电影的评分(从1到5)。为了得到一个DataFrame
,其中第一列是电影ID,其余列是每个用户对该电影的评分,我执行以下操作:
val ratingsPerMovieDF = imdbRatingsDF .groupBy("imdbId") .pivot("userId") .max("rating")
现在,我得到了一个DataFrame
,其中大多数值是null
,因为大多数用户只对少数电影进行了评分。
我对计算这些电影之间的相似性(基于项目的协同过滤)很感兴趣。
我尝试使用评分列的值组装一个RowMatrix
(用于进一步使用mllib进行相似性计算),然而,我不知道如何处理null
值。
以下是我尝试为每一行获取一个向量的代码:
val assembler = new VectorAssembler() .setInputCols(movieRatingsDF.columns.drop("imdbId")) .setOutputCol("ratings")val ratingsDF = assembler.transform(movieRatingsDF).select("imdbId", "ratings")
这给我带来了一个错误:
Caused by: org.apache.spark.SparkException: Values to assemble cannot be null.
我可以使用.na.fill(0)
将它们替换为0,但这会导致不正确的相关性结果,因为几乎所有向量都会变得非常相似。
在这种情况下,有人能建议我该怎么做吗?最终目标是计算行之间的相关性。我在考虑使用SparseVectors
(以忽略null
值),但我不知道该怎么做。
我是Spark和Scala的新手,所以有些内容可能不太清楚。我正在努力更好地理解这些内容。
回答:
我认为你的方法是错误的。处理Spark API的细微差别是次要的,重要的是正确定义问题——在稀疏数据的情况下,你所说的相关性到底是什么意思。
在明确反馈(评分)的情况下用零填充数据是有问题的,不是因为所有Vectors
都会变得非常相似(度量的变化将由现有评分驱动,结果总是可以使用最小-最大缩放器重新缩放),而是因为它引入了原始数据集中不存在的信息。未评分的项目和最低评分的项目之间存在显著差异。
总的来说,你可以从两个方面来处理这个问题:
-
你可以仅使用两个项目都有非缺失值的条目来计算成对相似性。如果数据集足够密集,这应该能很好地工作。这可以通过在输入数据集上进行自连接来表达。以下是伪代码:
imdbRatingsDF.alias("left") .join(imdbRatingsDF.alias("right"), Seq("userId")) .where($"left.imdbId" =!= $"right.imdbId") .groupBy($"left.imdbId", $"right.imdbId") .agg(simlarity($"left.rating", $"right.rating"))
其中
similarity
实现所需的相似性度量。 -
你可以填补缺失的评分,例如使用某种中心趋势的度量。使用平均值(用平均值替换缺失值 – Spark DataFrame)可能是最自然的选择。
更高级的填补技术可能会提供更可靠的结果,但可能在分布式系统中不会很好地扩展。
注意
使用SparseVectors
本质上等同于na.fill(0)
。