我试图对分组数据运行Spark的k-means聚类,但在尝试对每个组进行聚类时遇到了各种错误。
输入RDD的格式为(userID: Long, coords: [Vector]),例如:
org.apache.spark.rdd.RDD[(Long, Seq[org.apache.spark.mllib.linalg.Vector])]
Vector包含X Y坐标,即一对双精度数。我希望为每个UserID识别坐标聚类,因此我对RDD进行映射,并尝试对每个组运行k-means:
val userClusters = userCoordVectors.map { case (userId, coords) => val clusters = 4 val iterations = 30 // 需要将coords转换为RDD以输入到K-Means中 val parsedData = sc.parallelize(coords) // 应用k-means val model = KMeans.train(parsedData, clusters, iterations) ... etc}
但当我运行这段代码时,在以下这行代码中得到了一个空指针异常(NPE):
val parsedData = sc.parallelize(coords)
问题在于,我必须将coords转换为RDD以进行K-Means操作。
另一方面,如果我先收集输入RDD,就不会得到NPE。相反,我会得到一个Java堆错误,大概是因为我将整个RDD具体化了。
val userClusters = sc.parallelize(userCoordVectors.collect.map { ... })
在这里收集RDD中的数据似乎是不对的,所以我假设应该有更好的方法,但我不知道如何让parsedData这行代码正常工作。
有谁能看出我使用K-Means的方式有什么明显的错误,或者建议如何实现对每个组内数据进行聚类的目标?
回答:
你不能在RDD操作的任何函数中使用SparkContext或RDD。它们无法被序列化并通过网络发送。
Matei Zaharia在这里回答了这个问题:http://apache-spark-user-list.1001560.n3.nabble.com/Can-we-get-a-spark-context-inside-a-mapper-td9605.html
你目前不能在Spark任务中使用SparkContext,所以在这种情况下,你必须调用某种本地K-means库。一个你可以尝试使用的例子是Weka(http://www.cs.waikato.ac.nz/ml/weka/)。然后你可以使用SparkContext.wholeTextFiles将你的文本文件加载为字符串的RDD,并对每个文件调用Weka。