我正在使用Spark Scala来计算数据框行之间的余弦相似性。
数据框的架构如下:
root |-- itemId: string (nullable = true) |-- features: vector (nullable = true)
数据框的样例如下
+-------+--------------------+ | itemId| features| +-------+--------------------+ | ab |[4.7143,0.0,5.785...| | cd |[5.5,0.0,6.4286,4...| | ef |[4.7143,1.4286,6....| ........ +-------+--------------------+
计算余弦相似性的代码如下:
val irm = new IndexedRowMatrix(myDataframe.rdd.zipWithIndex().map { case (row, index) => IndexedRow(row.getAs[Vector]("features"), index)}).toCoordinateMatrix.transpose.toRowMatrix.columnSimilarities
在irm矩阵中,我有(i, j, score),其中i, j是我原始数据框中项目i和j的索引。我希望得到(itemIdA, itemIdB, score),其中itemIdA和itemIdB分别是索引i和j的ID,通过将irm与初始数据框连接,或者如果有更好的选项?
回答:
在将数据框转换为矩阵之前创建行索引,并创建索引与ID之间的映射。计算完成后,使用创建的Map
将列索引(之前是行索引,但通过transpose
更改)转换为ID。
val rdd = myDataframe.as[(String, org.apache.spark.mllib.linalg.Vector)].rdd.zipWithIndex()val indexMap = rdd.map{case ((id, vec), index) => (index, id)}.collectAsMap()
像之前一样使用以下代码计算余弦相似性:
val irm = new IndexedRowMatrix(rdd.map{case ((id, vec), index) => IndexedRow(index, vec)}) .toCoordinateMatrix().transpose().toRowMatrix().columnSimilarities()
将列索引转换回ID:
irm.entries.map(e => (indexMap(e.i), indexMap(e.j), e.value))
这应该能给你想要的结果。