我在训练Spark MLlib的线性回归模型,但我认为我对库的实际操作部分有些不理解。
我有一个特征(NameItem
)和一个输出(Accumulator
)。前者是分类特征(如速度、温度等),后者是双精度类型数值。
训练集包含数百万条记录,它们之间没有线性相关性(我已经通过热图和相关性指数进行了检查)。
问题:我想通过线性回归根据NameItem
的值来估计Accumulator
的值,但我觉得我实际上并没有做到这一点。
提问:我该怎么做呢?
首先,我将数据集分为训练集
和数据集
:
(trainDF, testDF) = df.randomSplit((0.80, 0.20), seed=42)
之后,我尝试了一个管道方法,就像大多数教程中展示的那样:
1) 我对NameItem进行了索引
indexer = StringIndexer(inputCol="NameItem", outputCol="CategorizedItem", handleInvalid = "keep")
2) 然后我对其进行了编码
encoderInput = [indexer.getOutputCol()]encoderOutput = ["EncodedItem"]encoder = OneHotEncoderEstimator(inputCols=encoderInput, outputCols=encoderOutput)
3) 还进行了特征组合
assemblerInput = encoderOutputassembler = VectorAssembler(inputCols=assemblerInput, outputCol="features")
之后,我继续进行实际的训练:
lr = LinearRegression(labelCol="Accumulator")pipeline = Pipeline(stages=[indexer, encoder, assembler, lr])lrModel = pipeline.fit(trainDF)
当我在测试集上应用预测时,我得到了以下结果:
predictions = lrModel.transform(testDF).show(5, False)
+--------------+-----------------+---------------+-----------------+-------------------------------+------------------+|NameItem |Accumulator |CategorizedItem|EncodedItem |features |prediction |+--------------+-----------------+---------------+-----------------+-------------------------------+------------------+|Speed |44000.00000000 |265.0 |(688,[265],[1.0])|(689,[265,688],[1.0,44000.0]) |44000.100892495786||Speed |245000.00000000 |265.0 |(688,[265],[1.0])|(689,[265,688],[1.0,245000.0]) |245000.09963708033||Temp |4473860.00000000 |66.0 |(688,[66],[1.0]) |(689,[66,688],[1.0,4473860.0]) |4473859.874261986 ||Temp |6065.00000000 |66.0 |(688,[66],[1.0]) |(689,[66,688],[1.0,6065.0]) |6065.097757082314 ||Temp |10140.00000000 |66.0 |(688,[66],[1.0]) |(689,[66,688],[1.0,10140.0]) |10140.097731630483|+--------------+-----------------+---------------+-----------------+-------------------------------+------------------+only showing top 5 rows
为什么对于同一个分类特征(例如Temp
),我会得到三个不同的预测值呢?
尽管这些预测值与预期值非常接近,但我觉得有什么地方不对劲。
回答:
为什么对于同一个分类特征(例如
Temp
),我会得到三个不同的预测值呢?
这是因为你的输出Accumulator
不知何故进入了features
(当然不应该这样),所以模型只是“预测”(实际上是复制)了输入的一部分;这就是为什么预测结果如此“准确”…
看起来是VectorAssembler
搞乱了事情。实际上,你在这里并不真正需要VectorAssembler
,因为你实际上只有一个“单一”特征(即EncodedItem
中的独热编码稀疏向量)。这可能是VectorAssembler
在这里表现得如此的原因(它被要求“组合”一个单一特征),但无论如何,这应该是一个错误。
所以我的建议是去掉VectorAssembler
,并直接将EncodedItem
重命名为features
,即:
indexer = StringIndexer(inputCol="NameItem", outputCol="CategorizedItem", handleInvalid = "keep")encoderInput = [indexer.getOutputCol()]encoderOutput = ["features"] # 第一处更改encoder = OneHotEncoderEstimator(inputCols=encoderInput, outputCols=encoderOutput)lr = LinearRegression(labelCol="Accumulator")pipeline = Pipeline(stages=[indexer, encoder, lr]) # 第二处更改lrModel = pipeline.fit(trainDF)
更新 (根据评论中的反馈)
我的Spark版本是1.4.4
遗憾的是,我无法重现这个问题,因为我没有访问Spark 1.4.4的权限,而这是你正在使用的版本。但我已经确认在最新的Spark版本2.4.4中它可以正常工作,这让我更加相信在v1.4版本中确实存在一些错误,但这些错误在后续版本中已经被解决了。
以下是在Spark 2.4.4中使用类似于你的虚拟数据进行的重现:
spark.version# '2.4.4'from pyspark.ml.feature import VectorAssembler, OneHotEncoderEstimator, StringIndexerfrom pyspark.ml.regression import LinearRegressionfrom pyspark.ml import Pipeline# 类似于你的虚拟数据:df = spark.createDataFrame([['Speed', 44000], ['Temp', 23000], ['Temp', 5000], ['Speed', 75000], ['Weight', 5300], ['Height', 34500], ['Weight', 6500]], ['NameItem', 'Accumulator'])df.show()# 结果:+--------+-----------+|NameItem|Accumulator|+--------+-----------+| Speed| 44000|| Temp| 23000|| Temp| 5000|| Speed| 75000|| Weight| 5300|| Height| 34500|| Weight| 6500|+--------+-----------+indexer = StringIndexer(inputCol="NameItem", outputCol="CategorizedItem", handleInvalid = "keep")encoderInput = [indexer.getOutputCol()]encoderOutput = ["EncodedItem"]encoder = OneHotEncoderEstimator(inputCols=encoderInput, outputCols=encoderOutput)assemblerInput = encoderOutputassembler = VectorAssembler(inputCols=assemblerInput, outputCol="features")lr = LinearRegression(labelCol="Accumulator")pipeline = Pipeline(stages=[indexer, encoder, assembler, lr])lrModel = pipeline.fit(df) lrModel.transform(df).show() # 为了简单起见,在同一df上进行预测
最后transform
的结果是
+--------+-----------+---------------+-------------+-------------+------------------+|NameItem|Accumulator|CategorizedItem| EncodedItem| features| prediction|+--------+-----------+---------------+-------------+-------------+------------------+| Speed| 44000| 2.0|(4,[2],[1.0])|(4,[2],[1.0])| 59500.0|| Temp| 23000| 1.0|(4,[1],[1.0])|(4,[1],[1.0])|14000.000000000004|| Temp| 5000| 1.0|(4,[1],[1.0])|(4,[1],[1.0])|14000.000000000004|| Speed| 75000| 2.0|(4,[2],[1.0])|(4,[2],[1.0])| 59500.0|| Weight| 5300| 0.0|(4,[0],[1.0])|(4,[0],[1.0])| 5900.000000000004|| Height| 34500| 3.0|(4,[3],[1.0])|(4,[3],[1.0])| 34500.0|| Weight| 6500| 0.0|(4,[0],[1.0])|(4,[0],[1.0])| 5900.000000000004| +--------+-----------+---------------+-------------+-------------+------------------+
从中可以看出:
features
现在不包括输出变量Accumulator
的值,正如它应该的那样;实际上,正如我上面所争论的,features
现在与EncodedItem
相同,这使得VectorAssembler
变得多余,正如我们所期望的那样,因为我们只有一个单一特征。- 对于相同的
NameItem
值,prediction
值现在是相同的,我们也希望它们是这样的,而且它们现在不太准确,因此更加现实。
因此,你的问题肯定与你使用的极其过时的Spark版本1.4.4有关。自从v1.4版本以来,Spark已经有了巨大的进步,你应该认真考虑更新…