通过Spark MLlib回归估计数值

我在训练Spark MLlib的线性回归模型,但我认为我对库的实际操作部分有些不理解。

我有一个特征(NameItem)和一个输出(Accumulator)。前者是分类特征(如速度、温度等),后者是双精度类型数值。

训练集包含数百万条记录,它们之间没有线性相关性(我已经通过热图和相关性指数进行了检查)。

问题:我想通过线性回归根据NameItem的值来估计Accumulator的值,但我觉得我实际上并没有做到这一点。

提问:我该怎么做呢?

首先,我将数据集分为训练集数据集

(trainDF, testDF) = df.randomSplit((0.80, 0.20), seed=42)

之后,我尝试了一个管道方法,就像大多数教程中展示的那样:

1) 我对NameItem进行了索引

indexer = StringIndexer(inputCol="NameItem", outputCol="CategorizedItem", handleInvalid = "keep")

2) 然后我对其进行了编码

encoderInput = [indexer.getOutputCol()]encoderOutput = ["EncodedItem"]encoder = OneHotEncoderEstimator(inputCols=encoderInput, outputCols=encoderOutput)

3) 还进行了特征组合

assemblerInput = encoderOutputassembler = VectorAssembler(inputCols=assemblerInput, outputCol="features")

之后,我继续进行实际的训练

lr = LinearRegression(labelCol="Accumulator")pipeline = Pipeline(stages=[indexer, encoder, assembler, lr])lrModel = pipeline.fit(trainDF)

当我在测试集上应用预测时,我得到了以下结果:

predictions = lrModel.transform(testDF).show(5, False)
+--------------+-----------------+---------------+-----------------+-------------------------------+------------------+|NameItem      |Accumulator      |CategorizedItem|EncodedItem      |features                       |prediction        |+--------------+-----------------+---------------+-----------------+-------------------------------+------------------+|Speed         |44000.00000000   |265.0          |(688,[265],[1.0])|(689,[265,688],[1.0,44000.0])  |44000.100892495786||Speed         |245000.00000000  |265.0          |(688,[265],[1.0])|(689,[265,688],[1.0,245000.0]) |245000.09963708033||Temp          |4473860.00000000 |66.0           |(688,[66],[1.0]) |(689,[66,688],[1.0,4473860.0]) |4473859.874261986 ||Temp          |6065.00000000    |66.0           |(688,[66],[1.0]) |(689,[66,688],[1.0,6065.0])    |6065.097757082314 ||Temp          |10140.00000000   |66.0           |(688,[66],[1.0]) |(689,[66,688],[1.0,10140.0])   |10140.097731630483|+--------------+-----------------+---------------+-----------------+-------------------------------+------------------+only showing top 5 rows

为什么对于同一个分类特征(例如Temp),我会得到三个不同的预测值呢?

尽管这些预测值与预期值非常接近,但我觉得有什么地方不对劲。


回答:

为什么对于同一个分类特征(例如Temp),我会得到三个不同的预测值呢?

这是因为你的输出Accumulator不知何故进入了features(当然不应该这样),所以模型只是“预测”(实际上是复制)了输入的一部分;这就是为什么预测结果如此“准确”…

看起来是VectorAssembler搞乱了事情。实际上,你在这里并不真正需要VectorAssembler,因为你实际上只有一个“单一”特征(即EncodedItem中的独热编码稀疏向量)。这可能是VectorAssembler在这里表现得如此的原因(它被要求“组合”一个单一特征),但无论如何,这应该是一个错误。

所以我的建议是去掉VectorAssembler,并直接将EncodedItem重命名为features,即:

indexer = StringIndexer(inputCol="NameItem", outputCol="CategorizedItem", handleInvalid = "keep")encoderInput = [indexer.getOutputCol()]encoderOutput = ["features"]  # 第一处更改encoder = OneHotEncoderEstimator(inputCols=encoderInput, outputCols=encoderOutput)lr = LinearRegression(labelCol="Accumulator")pipeline = Pipeline(stages=[indexer, encoder, lr])  # 第二处更改lrModel = pipeline.fit(trainDF)

更新 (根据评论中的反馈)

我的Spark版本是1.4.4

遗憾的是,我无法重现这个问题,因为我没有访问Spark 1.4.4的权限,而这是你正在使用的版本。但我已经确认在最新的Spark版本2.4.4中它可以正常工作,这让我更加相信在v1.4版本中确实存在一些错误,但这些错误在后续版本中已经被解决了。

以下是在Spark 2.4.4中使用类似于你的虚拟数据进行的重现:

spark.version# '2.4.4'from pyspark.ml.feature import VectorAssembler, OneHotEncoderEstimator, StringIndexerfrom pyspark.ml.regression import LinearRegressionfrom pyspark.ml import Pipeline# 类似于你的虚拟数据:df = spark.createDataFrame([['Speed', 44000],                             ['Temp', 23000],                             ['Temp', 5000],                             ['Speed', 75000],                             ['Weight', 5300],                             ['Height', 34500],                             ['Weight', 6500]],                             ['NameItem', 'Accumulator'])df.show()# 结果:+--------+-----------+|NameItem|Accumulator|+--------+-----------+|   Speed|      44000||    Temp|      23000||    Temp|       5000||   Speed|      75000||  Weight|       5300||  Height|      34500||  Weight|       6500|+--------+-----------+indexer = StringIndexer(inputCol="NameItem", outputCol="CategorizedItem", handleInvalid = "keep")encoderInput = [indexer.getOutputCol()]encoderOutput = ["EncodedItem"]encoder = OneHotEncoderEstimator(inputCols=encoderInput, outputCols=encoderOutput)assemblerInput = encoderOutputassembler = VectorAssembler(inputCols=assemblerInput, outputCol="features")lr = LinearRegression(labelCol="Accumulator")pipeline = Pipeline(stages=[indexer, encoder, assembler, lr])lrModel = pipeline.fit(df) lrModel.transform(df).show() # 为了简单起见,在同一df上进行预测

最后transform的结果是

+--------+-----------+---------------+-------------+-------------+------------------+|NameItem|Accumulator|CategorizedItem|  EncodedItem|     features|        prediction|+--------+-----------+---------------+-------------+-------------+------------------+|   Speed|      44000|            2.0|(4,[2],[1.0])|(4,[2],[1.0])|           59500.0||    Temp|      23000|            1.0|(4,[1],[1.0])|(4,[1],[1.0])|14000.000000000004||    Temp|       5000|            1.0|(4,[1],[1.0])|(4,[1],[1.0])|14000.000000000004||   Speed|      75000|            2.0|(4,[2],[1.0])|(4,[2],[1.0])|           59500.0||  Weight|       5300|            0.0|(4,[0],[1.0])|(4,[0],[1.0])| 5900.000000000004||  Height|      34500|            3.0|(4,[3],[1.0])|(4,[3],[1.0])|           34500.0||  Weight|       6500|            0.0|(4,[0],[1.0])|(4,[0],[1.0])| 5900.000000000004|   +--------+-----------+---------------+-------------+-------------+------------------+

从中可以看出:

  1. features现在包括输出变量Accumulator的值,正如它应该的那样;实际上,正如我上面所争论的,features现在与EncodedItem相同,这使得VectorAssembler变得多余,正如我们所期望的那样,因为我们只有一个单一特征。
  2. 对于相同的NameItem值,prediction值现在是相同的,我们也希望它们是这样的,而且它们现在不太准确,因此更加现实。

因此,你的问题肯定与你使用的极其过时的Spark版本1.4.4有关。自从v1.4版本以来,Spark已经有了巨大的进步,你应该认真考虑更新…

Related Posts

使用LSTM在Python中预测未来值

这段代码可以预测指定股票的当前日期之前的值,但不能预测…

如何在gensim的word2vec模型中查找双词组的相似性

我有一个word2vec模型,假设我使用的是googl…

dask_xgboost.predict 可以工作但无法显示 – 数据必须是一维的

我试图使用 XGBoost 创建模型。 看起来我成功地…

ML Tuning – Cross Validation in Spark

我在https://spark.apache.org/…

如何在React JS中使用fetch从REST API获取预测

我正在开发一个应用程序,其中Flask REST AP…

如何分析ML.NET中多类分类预测得分数组?

我在ML.NET中创建了一个多类分类项目。该项目可以对…

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注