使用reduceByKey()替代groupByKey()

这是对这里的跟进问题。我正在尝试基于这个实现来实现k-means算法。它运行得很好,但是我想用reduceByKey()替换groupByKey(),但我不确定如何做(目前我并不担心性能问题)。以下是相关的简化代码:

val data = sc.textFile("dense.txt").map(        t => (t.split("#")(0), parseVector(t.split("#")(1)))).cache()val read_mean_centroids = sc.textFile("centroids.txt").map(        t => (t.split("#")(0), parseVector(t.split("#")(1))))var centroids = read_mean_centroids.takeSample(false, K, 42).map(x => x._2)do {    var closest = read_mean_centroids.map(p => (closestPoint(p._2, centroids), p._2))    var pointsGroup = closest.groupByKey() // <-- THE VICTIM :)    var newCentroids = pointsGroup.mapValues(ps => average(ps.toSeq)).collectAsMap()    ..

注意,println(newCentroids)会输出:

Map(23 -> (-6.269305E-4, -0.0011746404, -4.08004E-5), 8 -> (-5.108732E-4, 7.336348E-4, -3.707591E-4), 17 -> (-0.0016383086, -0.0016974678, 1.45..

println(closest)会输出:

MapPartitionsRDD[6] at map at kmeans.scala:75

相关问题:在Apache Spark (Scala)中使用reduceByKey


一些文档

def reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)]

使用关联的reduce函数合并每个键的值。

def reduceByKey(func: (V, V) ⇒ V, numPartitions: Int): RDD[(K, V)]

使用关联的reduce函数合并每个键的值。

def reduceByKey(partitioner: Partitioner, func: (V, V) ⇒ V): RDD[(K, V)]

使用关联的reduce函数合并每个键的值。

def groupByKey(): RDD[(K, Iterable[V])]

将RDD中每个键的值组合成一个单一的序列。


回答:

你可以使用aggregateByKey()(比reduceByKey()更自然)来计算newCentroids,如下所示:

val newCentroids = closest.aggregateByKey((Vector.zeros(dim), 0L))(  (agg, v) => (agg._1 += v, agg._2 + 1L),  (agg1, agg2) => (agg1._1 += agg2._1, agg1._2 + agg2._2)).mapValues(agg => agg._1/agg._2).collectAsMap 

为了使其工作,你需要计算数据的维度,即dim,但你只需计算一次。你可以使用类似val dim = data.first._2.length的代码来实现。

Related Posts

L1-L2正则化的不同系数

我想对网络的权重同时应用L1和L2正则化。然而,我找不…

使用scikit-learn的无监督方法将列表分类成不同组别,有没有办法?

我有一系列实例,每个实例都有一份列表,代表它所遵循的不…

f1_score metric in lightgbm

我想使用自定义指标f1_score来训练一个lgb模型…

通过相关系数矩阵进行特征选择

我在测试不同的算法时,如逻辑回归、高斯朴素贝叶斯、随机…

可以将机器学习库用于流式输入和输出吗?

已关闭。此问题需要更加聚焦。目前不接受回答。 想要改进…

在TensorFlow中,queue.dequeue_up_to()方法的用途是什么?

我对这个方法感到非常困惑,特别是当我发现这个令人费解的…

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注