Pyspark 并行 ml.KMeans 互相覆盖 K

我参考了一篇帖子,尝试并行运行 KMeans。我使用的是 Python 2.7 和 EMR 上的 Spark 2.0.2 版本。

如何在一个 SparkContext 中从不同的线程运行多个作业?

正如帖子中所引述的,从不同进程提交的作业不应该互相影响。

在一个给定的 Spark 应用程序(SparkContext 实例)中,如果多个并行作业是从不同的线程提交的,它们可以同时运行。在本节中,我们所说的“作业”是指 Spark 操作(例如 save, collect)以及为了评估该操作需要运行的任何任务。Spark 的调度器完全是线程安全的,并且支持这种用例,以支持处理多个请求的应用程序(例如,为多个用户处理查询)。http://spark.apache.org/docs/latest/job-scheduling.html

然而,结果模型的聚类数 K 与传入的值不同。

代码:

from pyspark.ml.clustering import KMeansfrom sklearn.datasets.samples_generator import make_blobsfrom pyspark.ml.linalg import Vectorsimport randomrandom.seed(1)group_size = 30n_groups = 20n_samples= n_groups * group_sizen_features=2n_centers=4xs, ys = make_blobs(n_samples=n_samples, n_features=n_features, centers=n_centers, cluster_std=1.0, center_box=(-10.0, 10.0), shuffle=True, random_state=None)x_groups = []for i in range(n_groups):    x_groups.append(xs[i*group_size: (i+1)*group_size])def do_kmean(xs):    data = []    for x in xs:        data.append((Vectors.dense(x.tolist()),) )    df = spark.createDataFrame(data, ["features"])    num_clusters = random.randint(5,10)    kmeans = KMeans(k=num_clusters, maxIter=1, seed=1, featuresCol="features", predictionCol="prediction")    model = kmeans.fit(df)    return [num_clusters, kmeans.getK()]from multiprocessing.pool import ThreadPooltpool = ThreadPool(processes=8)result = tpool.map(do_kmean, x_groups)

结果:(输入的 K 与 KMeans 实际使用的 K 对比)

[[5, 9], [8, 9], [6, 8], [10, 9], [7, 9], [9, 9], [7, 9], [9, 9], [5, 5], [5, 9], [9, 7], [9, 9], [5, 7], [10, 5], [7, 7], [7, 7], [6, 6], [10, 10], [10, 10], [5, 5]]

看起来 Spark 并非线程/进程安全,并且会访问其他进程的 K 副本。是否有任何 Spark 配置会导致这个问题,或者这是 Spark 的一个 bug?


回答:

这确实是 Spark 2.0.2 和 2.1.0 的一个 bug。我在本地机器上使用上述两个版本成功复现了这个 bug。这个 bug 在 Spark 2.1.1 中已被修复。

https://issues.apache.org/jira/browse/SPARK-19348

Related Posts

L1-L2正则化的不同系数

我想对网络的权重同时应用L1和L2正则化。然而,我找不…

使用scikit-learn的无监督方法将列表分类成不同组别,有没有办法?

我有一系列实例,每个实例都有一份列表,代表它所遵循的不…

f1_score metric in lightgbm

我想使用自定义指标f1_score来训练一个lgb模型…

通过相关系数矩阵进行特征选择

我在测试不同的算法时,如逻辑回归、高斯朴素贝叶斯、随机…

可以将机器学习库用于流式输入和输出吗?

已关闭。此问题需要更加聚焦。目前不接受回答。 想要改进…

在TensorFlow中,queue.dequeue_up_to()方法的用途是什么?

我对这个方法感到非常困惑,特别是当我发现这个令人费解的…

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注