我正在尝试基于Spark编写一个情感分析程序。为此,我使用了word2vec和KMeans聚类。从word2Vec中,我得到了一个包含20k个词/向量的集合,这些向量位于100维空间中,现在我正在尝试对这个向量空间进行聚类。当我使用默认的并行实现运行KMeans时,算法运行了3个小时!但是使用随机初始化策略时,只用了8分钟。我做错了什么?我有一台配备4核处理器和16 GB内存的MacBook Pro机器。
K ~= 4000,最大迭代次数为20
var vectors: Iterable[org.apache.spark.mllib.linalg.Vector] = model.getVectors.map(entry => new VectorWithLabel(entry._1, entry._2.map(_.toDouble))) val data = sc.parallelize(vectors.toIndexedSeq).persist(StorageLevel.MEMORY_ONLY_2) log.info("Clustering data size {}",data.count()) log.info("==================Train process started=================="); val clusterSize = modelSize/5 val kmeans = new KMeans() kmeans.setInitializationMode(KMeans.K_MEANS_PARALLEL) kmeans.setK(clusterSize) kmeans.setRuns(1) kmeans.setMaxIterations(50) kmeans.setEpsilon(1e-4) time = System.currentTimeMillis() val clusterModel: KMeansModel = kmeans.run(data)
Spark上下文的初始化在这里:
val conf = new SparkConf() .setAppName("SparkPreProcessor") .setMaster("local[4]") .set("spark.default.parallelism", "8") .set("spark.executor.memory", "1g") val sc = SparkContext.getOrCreate(conf)
关于运行这个程序的一些更新。我是在Intelij IDEA中运行它的。我没有真正的Spark集群。但我认为你的个人机器可以作为Spark集群
我发现程序在Spark代码LocalKMeans.scala中的这个循环中挂起:
// Initialize centers by sampling using the k-means++ procedure. centers(0) = pickWeighted(rand, points, weights).toDense for (i <- 1 until k) { // Pick the next center with a probability proportional to cost under current centers val curCenters = centers.view.take(i) val sum = points.view.zip(weights).map { case (p, w) => w * KMeans.pointCost(curCenters, p) }.sum val r = rand.nextDouble() * sum var cumulativeScore = 0.0 var j = 0 while (j < points.length && cumulativeScore < r) { cumulativeScore += weights(j) * KMeans.pointCost(curCenters, points(j)) j += 1 } if (j == 0) { logWarning("kMeansPlusPlus initialization ran out of distinct points for centers." + s" Using duplicate point for center k = $i.") centers(i) = points(0).toDense } else { centers(i) = points(j - 1).toDense } }
回答:
我在AWS上运行了Spark,配备了3个从节点(c3.xlarge),结果是一样的——问题在于并行KMeans在N个并行运行中初始化算法,但对于少量数据仍然极其缓慢,我的解决方案是继续使用随机初始化。数据大小大约是:4k个聚类对应21k个100维向量。