我对Spark完全是新手,目前正在尝试使用Python编写一个简单的代码,对一组数据进行KMeans聚类分析。
from pyspark import SparkContext, SparkConffrom pyspark.sql import SQLContextimport refrom pyspark.mllib.clustering import KMeans, KMeansModelfrom pyspark.mllib.linalg import DenseVectorfrom pyspark.mllib.linalg import SparseVectorfrom numpy import arrayfrom pyspark.ml.feature import VectorAssemblerfrom pyspark.ml.feature import MinMaxScalerimport pandas as pdimport numpydf = pd.read_csv("/<path>/Wholesale_customers_data.csv")sql_sc = SQLContext(sc)cols = ["Channel", "Region", "Fresh", "Milk", "Grocery", "Frozen", "Detergents_Paper", "Delicassen"]s_df = sql_sc.createDataFrame(df)vectorAss = VectorAssembler(inputCols=cols, outputCol="feature")vdf = vectorAss.transform(s_df)km = KMeans.train(vdf, k=2, maxIterations=10, runs=10, initializationMode="k-means||")model = kmeans.fit(vdf)cluster = model.clusterCenters()print(cluster)
我在pyspark shell中输入了这些代码,当运行到model = kmeans.fit(vdf)时,遇到了以下错误:
TypeError: 无法将类型转换为Vector
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) at org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:207) at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313) at org.apache.spark.rdd.RDD.iterator(RDD.scala:277) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) at org.apache.spark.rdd.RDD.iterator(RDD.scala:275) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313) at org.apache.spark.rdd.RDD.iterator(RDD.scala:277) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313) at org.apache.spark.rdd.RDD.iterator(RDD.scala:277) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 17/02/26 23:31:58 ERROR Executor: Exception in task 6.0 in stage 23.0 (TID 113) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File “/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/worker.py”, line 111, in main process() File “/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/worker.py”, line 106, in process serializer.dump_stream(func(split_index, iterator), outfile) File “/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/serializers.py”, line 263, in dump_stream vs = list(itertools.islice(iterator, batch)) File “/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/mllib/linalg/init.py”, line 77, in _convert_to_vector raise TypeError(“Cannot convert type %s into Vector” % type(l)) TypeError: Cannot convert type into Vector The
我使用的数据来自:https://archive.ics.uci.edu/ml/machine-learning-databases/00292/Wholesale%20customers%20data.csv
能否有人告诉我这里出了什么问题,我遗漏了什么?我非常感谢任何帮助。
谢谢你!
更新:@隐藏人名我遇到的错误是:
我遇到的错误是:>>> kmm = kmeans.fit(s_df)17/03/02 21:58:01 INFO BlockManagerInfo: Removed broadcast_1_piece0 on localhost:56193 in memory (size: 5.8 KB, free: 511.1 MB) 17/03/02 21:58:01 INFO ContextCleaner: Cleaned accumulator 5 17/03/02 21:58:01 INFO BlockManagerInfo: Removed broadcast_0_piece0 on localhost:56193 in memory (size: 5.8 KB, free: 511.1 MB) 17/03/02 21:58:01 INFO ContextCleaner: Cleaned accumulator 4
Traceback (most recent call last): File “”, line 1, in File “/usr/hdp/2.5.0.0-1245/spark/python/pyspark/ml/pipeline.py”, line 69, in fit return self._fit(dataset) File “/usr/hdp/2.5.0.0-1245/spark/python/pyspark/ml/wrapper.py”, line 133, in _fit java_model = self._fit_java(dataset) File “/usr/hdp/2.5.0.0-1245/spark/python/pyspark/ml/wrapper.py”, line 130, in _fit_java return self._java_obj.fit(dataset._jdf) File “/usr/hdp/2.5.0.0-1245/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py”, line 813, in call File “/usr/hdp/2.5.0.0-1245/spark/python/pyspark/sql/utils.py”, line 51, in deco raise AnalysisException(s.split(‘: ‘, 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: u”cannot resolve ‘features’ given input columns: [Channel, Grocery, Fresh, Frozen, Detergents_Paper, Region, Delicassen, Milk];”
回答: