Apache Spark K-Means聚类 – 输入使用RDD

我试图对分组数据运行Spark的k-means聚类,但在尝试对每个组进行聚类时遇到了各种错误。

输入RDD的格式为(userID: Long, coords: [Vector]),例如:

org.apache.spark.rdd.RDD[(Long, Seq[org.apache.spark.mllib.linalg.Vector])]

Vector包含X Y坐标,即一对双精度数。我希望为每个UserID识别坐标聚类,因此我对RDD进行映射,并尝试对每个组运行k-means:

val userClusters = userCoordVectors.map {  case (userId, coords) =>    val clusters = 4    val iterations = 30    // 需要将coords转换为RDD以输入到K-Means中    val parsedData = sc.parallelize(coords)    // 应用k-means    val model = KMeans.train(parsedData, clusters, iterations)    ...    etc}

但当我运行这段代码时,在以下这行代码中得到了一个空指针异常(NPE):

val parsedData = sc.parallelize(coords)

问题在于,我必须将coords转换为RDD以进行K-Means操作。

另一方面,如果我先收集输入RDD,就不会得到NPE。相反,我会得到一个Java堆错误,大概是因为我将整个RDD具体化了。

 val userClusters = sc.parallelize(userCoordVectors.collect.map { ... })

在这里收集RDD中的数据似乎是不对的,所以我假设应该有更好的方法,但我不知道如何让parsedData这行代码正常工作。

有谁能看出我使用K-Means的方式有什么明显的错误,或者建议如何实现对每个组内数据进行聚类的目标?


回答:

你不能在RDD操作的任何函数中使用SparkContext或RDD。它们无法被序列化并通过网络发送。

Matei Zaharia在这里回答了这个问题:http://apache-spark-user-list.1001560.n3.nabble.com/Can-we-get-a-spark-context-inside-a-mapper-td9605.html

你目前不能在Spark任务中使用SparkContext,所以在这种情况下,你必须调用某种本地K-means库。一个你可以尝试使用的例子是Weka(http://www.cs.waikato.ac.nz/ml/weka/)。然后你可以使用SparkContext.wholeTextFiles将你的文本文件加载为字符串的RDD,并对每个文件调用Weka。

Related Posts

使用LSTM在Python中预测未来值

这段代码可以预测指定股票的当前日期之前的值,但不能预测…

如何在gensim的word2vec模型中查找双词组的相似性

我有一个word2vec模型,假设我使用的是googl…

dask_xgboost.predict 可以工作但无法显示 – 数据必须是一维的

我试图使用 XGBoost 创建模型。 看起来我成功地…

ML Tuning – Cross Validation in Spark

我在https://spark.apache.org/…

如何在React JS中使用fetch从REST API获取预测

我正在开发一个应用程序,其中Flask REST AP…

如何分析ML.NET中多类分类预测得分数组?

我在ML.NET中创建了一个多类分类项目。该项目可以对…

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注