Pyspark 并行 ml.KMeans 互相覆盖 K

我参考了一篇帖子,尝试并行运行 KMeans。我使用的是 Python 2.7 和 EMR 上的 Spark 2.0.2 版本。

如何在一个 SparkContext 中从不同的线程运行多个作业?

正如帖子中所引述的,从不同进程提交的作业不应该互相影响。

在一个给定的 Spark 应用程序(SparkContext 实例)中,如果多个并行作业是从不同的线程提交的,它们可以同时运行。在本节中,我们所说的“作业”是指 Spark 操作(例如 save, collect)以及为了评估该操作需要运行的任何任务。Spark 的调度器完全是线程安全的,并且支持这种用例,以支持处理多个请求的应用程序(例如,为多个用户处理查询)。http://spark.apache.org/docs/latest/job-scheduling.html

然而,结果模型的聚类数 K 与传入的值不同。

代码:

from pyspark.ml.clustering import KMeansfrom sklearn.datasets.samples_generator import make_blobsfrom pyspark.ml.linalg import Vectorsimport randomrandom.seed(1)group_size = 30n_groups = 20n_samples= n_groups * group_sizen_features=2n_centers=4xs, ys = make_blobs(n_samples=n_samples, n_features=n_features, centers=n_centers, cluster_std=1.0, center_box=(-10.0, 10.0), shuffle=True, random_state=None)x_groups = []for i in range(n_groups):    x_groups.append(xs[i*group_size: (i+1)*group_size])def do_kmean(xs):    data = []    for x in xs:        data.append((Vectors.dense(x.tolist()),) )    df = spark.createDataFrame(data, ["features"])    num_clusters = random.randint(5,10)    kmeans = KMeans(k=num_clusters, maxIter=1, seed=1, featuresCol="features", predictionCol="prediction")    model = kmeans.fit(df)    return [num_clusters, kmeans.getK()]from multiprocessing.pool import ThreadPooltpool = ThreadPool(processes=8)result = tpool.map(do_kmean, x_groups)

结果:(输入的 K 与 KMeans 实际使用的 K 对比)

[[5, 9], [8, 9], [6, 8], [10, 9], [7, 9], [9, 9], [7, 9], [9, 9], [5, 5], [5, 9], [9, 7], [9, 9], [5, 7], [10, 5], [7, 7], [7, 7], [6, 6], [10, 10], [10, 10], [5, 5]]

看起来 Spark 并非线程/进程安全,并且会访问其他进程的 K 副本。是否有任何 Spark 配置会导致这个问题,或者这是 Spark 的一个 bug?


回答:

这确实是 Spark 2.0.2 和 2.1.0 的一个 bug。我在本地机器上使用上述两个版本成功复现了这个 bug。这个 bug 在 Spark 2.1.1 中已被修复。

https://issues.apache.org/jira/browse/SPARK-19348

Related Posts

使用LSTM在Python中预测未来值

这段代码可以预测指定股票的当前日期之前的值,但不能预测…

如何在gensim的word2vec模型中查找双词组的相似性

我有一个word2vec模型,假设我使用的是googl…

dask_xgboost.predict 可以工作但无法显示 – 数据必须是一维的

我试图使用 XGBoost 创建模型。 看起来我成功地…

ML Tuning – Cross Validation in Spark

我在https://spark.apache.org/…

如何在React JS中使用fetch从REST API获取预测

我正在开发一个应用程序,其中Flask REST AP…

如何分析ML.NET中多类分类预测得分数组?

我在ML.NET中创建了一个多类分类项目。该项目可以对…

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注