Apache Spark K-Means聚类 – 输入使用RDD

我试图对分组数据运行Spark的k-means聚类,但在尝试对每个组进行聚类时遇到了各种错误。

输入RDD的格式为(userID: Long, coords: [Vector]),例如:

org.apache.spark.rdd.RDD[(Long, Seq[org.apache.spark.mllib.linalg.Vector])]

Vector包含X Y坐标,即一对双精度数。我希望为每个UserID识别坐标聚类,因此我对RDD进行映射,并尝试对每个组运行k-means:

val userClusters = userCoordVectors.map {  case (userId, coords) =>    val clusters = 4    val iterations = 30    // 需要将coords转换为RDD以输入到K-Means中    val parsedData = sc.parallelize(coords)    // 应用k-means    val model = KMeans.train(parsedData, clusters, iterations)    ...    etc}

但当我运行这段代码时,在以下这行代码中得到了一个空指针异常(NPE):

val parsedData = sc.parallelize(coords)

问题在于,我必须将coords转换为RDD以进行K-Means操作。

另一方面,如果我先收集输入RDD,就不会得到NPE。相反,我会得到一个Java堆错误,大概是因为我将整个RDD具体化了。

 val userClusters = sc.parallelize(userCoordVectors.collect.map { ... })

在这里收集RDD中的数据似乎是不对的,所以我假设应该有更好的方法,但我不知道如何让parsedData这行代码正常工作。

有谁能看出我使用K-Means的方式有什么明显的错误,或者建议如何实现对每个组内数据进行聚类的目标?


回答:

你不能在RDD操作的任何函数中使用SparkContext或RDD。它们无法被序列化并通过网络发送。

Matei Zaharia在这里回答了这个问题:http://apache-spark-user-list.1001560.n3.nabble.com/Can-we-get-a-spark-context-inside-a-mapper-td9605.html

你目前不能在Spark任务中使用SparkContext,所以在这种情况下,你必须调用某种本地K-means库。一个你可以尝试使用的例子是Weka(http://www.cs.waikato.ac.nz/ml/weka/)。然后你可以使用SparkContext.wholeTextFiles将你的文本文件加载为字符串的RDD,并对每个文件调用Weka。

Related Posts

L1-L2正则化的不同系数

我想对网络的权重同时应用L1和L2正则化。然而,我找不…

使用scikit-learn的无监督方法将列表分类成不同组别,有没有办法?

我有一系列实例,每个实例都有一份列表,代表它所遵循的不…

f1_score metric in lightgbm

我想使用自定义指标f1_score来训练一个lgb模型…

通过相关系数矩阵进行特征选择

我在测试不同的算法时,如逻辑回归、高斯朴素贝叶斯、随机…

可以将机器学习库用于流式输入和输出吗?

已关闭。此问题需要更加聚焦。目前不接受回答。 想要改进…

在TensorFlow中,queue.dequeue_up_to()方法的用途是什么?

我对这个方法感到非常困惑,特别是当我发现这个令人费解的…

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注