Spark ML – 从新数据元素创建特征向量以进行预测

tl;dr

我在Spark 2.10中拟合了一个LinearRegression模型 – 在使用StringIndexer和OneHotEncoder之后,我得到了一个大约44个元素的特征向量。对于我想进行预测的新数据,我如何从新数据元素创建一个特征向量?

更多细节

首先,这是一个完全虚构的例子,用于学习如何做这件事。使用带有以下字段的日志:

"elapsed_time", "api_name", "method", 和 "status_code"

我们将创建一个以elapsed_time为标签的模型,并使用其他字段作为我们的特征集。完整的代码将在下面共享。

步骤 – 简化版

  1. 将数据读入DataFrame
  2. 使用StringIndexer为每个特征建立索引
  3. 使用OneHotEncoder对索引后的特征进行独热编码
  4. 使用VectorAssembler创建我们的特征向量
  5. 将数据分割成训练集和测试集
  6. 拟合模型并对测试数据进行预测

结果很糟糕,但正如我所说,这是一个虚构的练习…

我需要学习如何做的事情

例如,如果一个新的日志条目进入流式应用程序,我该如何从新数据中创建特征向量并将其传递给predict()函数?

一个新的日志条目可能是这样的:

{api_name”:”/sample_api_1/v2″,”method”:”GET”,”status_code”:”200″,”elapsed_time”:39}

VectorAssembler之后

status_code_vector

(14,[0],[1.0])

api_name_vector

(27,[0],[1.0])

method_vector

(3,[0],[1.0])

features vector

(44,[0,14,41],[1.0,1.0,1.0])

代码

%sparkimport org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler, StringIndexerModel, VectorSlicer}import org.apache.spark.ml.linalg.Vectorsimport org.apache.spark.ml.regression.LinearRegressionimport org.apache.spark.sql.DataFrameval logs = sc.textFile("/Users/z001vmk/data/sample_102M.txt")val dfLogsRaw: DataFrame = spark.read.json(logs)val dfLogsFiltered = dfLogsRaw.filter("status_code != 314").drop("extra_column")// 创建包含我们关注字段的DF。val dfFeatures: DataFrame = dfLogsFiltered.select("elapsed_time", "api_name", "method", "status_code")// 虚构的目标:// 使用elapsed time作为我们的标签,给定特征api_name, status_code, & method。// 在小型(100Mb)数据集上训练模型// 能够预测给定类似于这个示例的新记录的elapsed_time:// --> {api_name":"/sample_api_1/v2","method":"GET","status_code":"200","elapsed_time":39}// 索引器val statusCodeIdxr: StringIndexer = new StringIndexer().setInputCol("status_code").setOutputCol("status_code_idx").setHandleInvalid("skip")val apiNameIdxr: StringIndexer = new StringIndexer().setInputCol("api_name").setOutputCol("api_name_idx").setHandleInvalid("skip")val methodIdxr: StringIndexer = new StringIndexer().setInputCol("method").setOutputCol("method_idx").setHandleInvalid("skip")// 索引特征:val dfIndexed0: DataFrame = statusCodeIdxr.fit(dfFeatures).transform(dfFeatures)val dfIndexed1: DataFrame = apiNameIdxr.fit(dfIndexed0).transform(dfIndexed0)val indexed: DataFrame = methodIdxr.fit(dfIndexed1).transform(dfIndexed1)// OneHotEncodersval statusCodeEncoder: OneHotEncoder = new OneHotEncoder().setInputCol(statusCodeIdxr.getOutputCol).setOutputCol("status_code_vec")val apiNameEncoder: OneHotEncoder = new OneHotEncoder().setInputCol(apiNameIdxr.getOutputCol).setOutputCol("api_name_vec")val methodEncoder: OneHotEncoder = new OneHotEncoder().setInputCol(methodIdxr.getOutputCol).setOutputCol("method_vec")// 编码特征向量val encoded0: DataFrame = statusCodeEncoder.transform(indexed)val encoded1: DataFrame = apiNameEncoder.transform(encoded0)val encoded: DataFrame = methodEncoder.transform(encoded1)// 限制我们的数据集到必要的元素:val dataset0 = encoded.select("elapsed_time", "status_code_vec", "api_name_vec", "method_vec").withColumnRenamed("elapsed_time", "label")// 组装特征向量val assembler: VectorAssembler = new VectorAssembler().setInputCols(Array("status_code_vec", "api_name_vec", "method_vec")).setOutputCol("features")val dataset1 = assembler.transform(dataset0)dataset1.show(5,false)// 准备训练数据集(可选):val dataset: DataFrame = dataset1.select("label", "features")dataset.show(3,false)val Array(training, test) = dataset.randomSplit(Array(0.8, 0.2))// 创建我们的线性回归模型val lr: LinearRegression = new LinearRegression().setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8).setLabelCol("label").setFeaturesCol("features")val lrModel = lr.fit(training)val predictions = lrModel.transform(test)predictions.show(20,false)

如果您有兴趣,可以将这些代码粘贴到Zeppelin笔记本中。

总结

所以,我一直在寻找的是如何将新数据转换为大约35个元素的特征向量,并使用训练数据拟合的模型进行转换并获得预测。我怀疑模型本身或在这种情况下需要从StringIndexers中维护的元数据 – 但这是我找不到的。

非常乐意被指向文档或示例 – 所有帮助都将不胜感激。

谢谢!


回答:

在使用PipelineModel的道路上,这变得非常简单。感谢@tadamhicks让我尽早关注管道。

下面是一个更新的代码块,它基本上执行了与上面相同的模型创建、拟合和预测,但使用管道来完成,并且增加了一部分,我们在一个新创建的DataFrame上进行预测,以模拟如何对新数据进行预测。

可能有更清洁的方法来重命名/创建我们的标签列,但我们将这留作未来的改进。

%sparkimport org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler, StringIndexerModel, VectorSlicer}import org.apache.spark.ml.{Pipeline, PipelineModel}import org.apache.spark.ml.linalg.Vectorsimport org.apache.spark.ml.regression.LinearRegressionimport org.apache.spark.sql.DataFrameval logs = sc.textFile("/data/sample_102M.txt")val dfLogsRaw: DataFrame = spark.read.json(logs)val dfLogsFiltered = dfLogsRaw.filter("status_code != 314").drop("extra_column")                              .select("elapsed_time", "api_name", "method", "status_code","cache_status")                              .withColumnRenamed("elapsed_time", "label")val Array(training, test) = dfLogsFiltered.randomSplit(Array(0.8, 0.2))// 索引器val statusCodeIdxr: StringIndexer = new StringIndexer().setInputCol("status_code").setOutputCol("status_code_idx").setHandleInvalid("skip")val apiNameIdxr: StringIndexer = new StringIndexer().setInputCol("api_name").setOutputCol("api_name_idx").setHandleInvalid("skip")val methodIdxr: StringIndexer = new StringIndexer().setInputCol("method").setOutputCol("method_idx").setHandleInvalid("skip")//"cache_status"val cacheStatusIdxr: StringIndexer = new StringIndexer().setInputCol("cache_status").setOutputCol("cache_status_idx").setHandleInvalid("skip")// OneHotEncodersval statusCodeEncoder: OneHotEncoder = new OneHotEncoder().setInputCol(statusCodeIdxr.getOutputCol).setOutputCol("status_code_vec")val apiNameEncoder: OneHotEncoder = new OneHotEncoder().setInputCol(apiNameIdxr.getOutputCol).setOutputCol("api_name_vec")val methodEncoder: OneHotEncoder = new OneHotEncoder().setInputCol(methodIdxr.getOutputCol).setOutputCol("method_vec")val cacheStatusEncoder: OneHotEncoder = new OneHotEncoder().setInputCol(cacheStatusIdxr.getOutputCol).setOutputCol("cache_status_vec")// 向量组装器val assembler: VectorAssembler = new VectorAssembler().setInputCols(Array("status_code_vec", "api_name_vec", "method_vec", "cache_status_vec")).setOutputCol("features")val lr: LinearRegression = new LinearRegression().setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8).setLabelCol("label").setFeaturesCol("features")val pipeline = new Pipeline().setStages(Array(statusCodeIdxr, apiNameIdxr, methodIdxr, cacheStatusIdxr, statusCodeEncoder, apiNameEncoder, methodEncoder, cacheStatusEncoder, assembler, lr))val plModel: PipelineModel = pipeline.fit(training)plModel.write.overwrite().save("/tmp/spark-linear-regression-model")plModel.transform(test).select("label", "prediction").show(5,false)val dataElement: String = """{"api_name":"/sample_api/v2","method":"GET","status_code":"200","cache_status":"MISS","elapsed_time":39}"""val newDataRDD = spark.sparkContext.makeRDD(dataElement :: Nil)val newData = spark.read.json(newDataRDD).withColumnRenamed("elapsed_time", "label")val loadedPlModel = PipelineModel.load("/tmp/spark-linear-regression-model")loadedPlModel.transform(newData).select("label", "prediction").show

Related Posts

L1-L2正则化的不同系数

我想对网络的权重同时应用L1和L2正则化。然而,我找不…

使用scikit-learn的无监督方法将列表分类成不同组别,有没有办法?

我有一系列实例,每个实例都有一份列表,代表它所遵循的不…

f1_score metric in lightgbm

我想使用自定义指标f1_score来训练一个lgb模型…

通过相关系数矩阵进行特征选择

我在测试不同的算法时,如逻辑回归、高斯朴素贝叶斯、随机…

可以将机器学习库用于流式输入和输出吗?

已关闭。此问题需要更加聚焦。目前不接受回答。 想要改进…

在TensorFlow中,queue.dequeue_up_to()方法的用途是什么?

我对这个方法感到非常困惑,特别是当我发现这个令人费解的…

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注