我刚开始接触Apache Spark,我有一个如下格式的文件:
Name Size Records File1 1,000 104,370 File2 950 91,780 File3 1,500 109,123 File4 2,170 113,888File5 2,000 111,974File6 1,820 110,666File7 1,200 106,771 File8 1,500 108,991 File9 1,000 104,007File10 1,300 107,037File11 1,900 111,109File12 1,430 108,051File13 1,780 110,006File14 2,010 114,449File15 2,017 114,889
这是我的样本/测试数据。我正在开发一个异常检测程序,需要测试其他具有相同格式但不同值的文件,并检测哪些文件在大小和记录值上有异常(如果另一文件的大小/记录与标准文件差异很大,或者大小和记录在各文件中不成比例)。我决定尝试不同的机器学习算法,并想从k-Means方法开始。我尝试将这个文件放入以下代码行:
KMeansModel model = kmeans.fit(file)
文件已经解析为Dataset变量。然而,我得到了一个错误,我相当确定这与文件的结构/模式有关。是否有方法在尝试将结构化/标记/组织的数据拟合到模型中时使用这些数据?
我得到的错误是:线程”main”中的异常 java.lang.IllegalArgumentException: 字段”features”不存在。
这是代码:
public class practice {public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("Anomaly Detection").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); SparkSession spark = SparkSession .builder() .appName("Anomaly Detection") .getOrCreate();String day1 = "C:\\Users\\ZK0GJXO\\Documents\\day1.txt"; Dataset<Row> df = spark.read(). option("header", "true"). option("delimiter", "\t"). csv(day1); df.show(); KMeans kmeans = new KMeans().setK(2).setSeed(1L); KMeansModel model = kmeans.fit(df);}
}
谢谢
回答:
默认情况下,所有的Spark ML模型都在名为”features”的列上进行训练。可以通过setFeaturesCol方法指定不同的输入列名 http://spark.apache.org/docs/latest/api/java/org/apache/spark/ml/clustering/KMeans.html#setFeaturesCol(java.lang.String)
更新:
可以使用VectorAssembler将多个列组合成一个特征向量:
VectorAssembler assembler = new VectorAssembler().setInputCols(new String[]{"size", "records"}).setOutputCol("features"); Dataset<Row> vectorized_df = assembler.transform(df) KMeans kmeans = new KMeans().setK(2).setSeed(1L); KMeansModel model = kmeans.fit(vectorized_df);
可以进一步使用pipeline API流线化和链式这些特征转换 https://spark.apache.org/docs/latest/ml-pipeline.html#example-pipeline