我参考了一篇帖子,尝试并行运行 KMeans。我使用的是 Python 2.7 和 EMR 上的 Spark 2.0.2 版本。
如何在一个 SparkContext 中从不同的线程运行多个作业?
正如帖子中所引述的,从不同进程提交的作业不应该互相影响。
在一个给定的 Spark 应用程序(SparkContext 实例)中,如果多个并行作业是从不同的线程提交的,它们可以同时运行。在本节中,我们所说的“作业”是指 Spark 操作(例如 save, collect)以及为了评估该操作需要运行的任何任务。Spark 的调度器完全是线程安全的,并且支持这种用例,以支持处理多个请求的应用程序(例如,为多个用户处理查询)。http://spark.apache.org/docs/latest/job-scheduling.html
然而,结果模型的聚类数 K 与传入的值不同。
代码:
from pyspark.ml.clustering import KMeansfrom sklearn.datasets.samples_generator import make_blobsfrom pyspark.ml.linalg import Vectorsimport randomrandom.seed(1)group_size = 30n_groups = 20n_samples= n_groups * group_sizen_features=2n_centers=4xs, ys = make_blobs(n_samples=n_samples, n_features=n_features, centers=n_centers, cluster_std=1.0, center_box=(-10.0, 10.0), shuffle=True, random_state=None)x_groups = []for i in range(n_groups): x_groups.append(xs[i*group_size: (i+1)*group_size])def do_kmean(xs): data = [] for x in xs: data.append((Vectors.dense(x.tolist()),) ) df = spark.createDataFrame(data, ["features"]) num_clusters = random.randint(5,10) kmeans = KMeans(k=num_clusters, maxIter=1, seed=1, featuresCol="features", predictionCol="prediction") model = kmeans.fit(df) return [num_clusters, kmeans.getK()]from multiprocessing.pool import ThreadPooltpool = ThreadPool(processes=8)result = tpool.map(do_kmean, x_groups)
结果:(输入的 K 与 KMeans 实际使用的 K 对比)
[[5, 9], [8, 9], [6, 8], [10, 9], [7, 9], [9, 9], [7, 9], [9, 9], [5, 5], [5, 9], [9, 7], [9, 9], [5, 7], [10, 5], [7, 7], [7, 7], [6, 6], [10, 10], [10, 10], [5, 5]]
看起来 Spark 并非线程/进程安全,并且会访问其他进程的 K 副本。是否有任何 Spark 配置会导致这个问题,或者这是 Spark 的一个 bug?
回答:
这确实是 Spark 2.0.2 和 2.1.0 的一个 bug。我在本地机器上使用上述两个版本成功复现了这个 bug。这个 bug 在 Spark 2.1.1 中已被修复。