我正在使用Spark,并希望训练一个机器学习模型。
由于结果不佳,我想显示模型在训练的每个周期(在训练和测试数据集上)所犯的错误。
然后我将使用这些信息来确定我的模型是否在数据上欠拟合或过拟合。
问题: 如何使用Spark绘制模型的学习曲线?
在下面的例子中,我实现了自己的评估器,并重写了evaluate方法以打印我需要的指标,但只显示了两个值(maxIter = 1000)。
MinimalRunnableCode.scala:
import org.apache.spark.SparkConfimport org.apache.spark.ml.linalg.Vectorsimport org.apache.spark.ml.regression.LinearRegressionimport org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}import org.apache.spark.sql.SparkSessionobject Min extends App { // 打开Spark会话。 val conf = new SparkConf() .setMaster("local") .set("spark.network.timeout", "800") val ss = SparkSession.builder .config(conf) .getOrCreate // 加载数据。 val data = ss.createDataFrame(ss.sparkContext.parallelize( List( (Vectors.dense(1, 2), 1), (Vectors.dense(1, 3), 2), (Vectors.dense(1, 2), 1), (Vectors.dense(1, 3), 2), (Vectors.dense(1, 2), 1), (Vectors.dense(1, 3), 2), (Vectors.dense(1, 2), 1), (Vectors.dense(1, 3), 2), (Vectors.dense(1, 2), 1), (Vectors.dense(1, 3), 2), (Vectors.dense(1, 4), 3) ) )) .withColumnRenamed("_1", "features") .withColumnRenamed("_2", "label") val Array(training, test) = data.randomSplit(Array(0.8, 0.2), seed = 42) // 创建线性回归模型。 val lr = new LinearRegression().setMaxIter(1000) // 创建参数网格,用于训练线性模型的不同版本。 val paramGrid = new ParamGridBuilder() .addGrid(lr.regParam, Array(0.001)) .addGrid(lr.fitIntercept) .addGrid(lr.elasticNetParam, Array(0.5)) .build() // 使用验证分割创建训练器,以评估哪组参数表现最佳。 val trainValidationSplit = new TrainValidationSplit() .setEstimator(lr) .setEvaluator(new CustomRegressionEvaluator) .setEstimatorParamMaps(paramGrid) .setTrainRatio(0.8) // 80%的数据用于训练,剩余20%用于验证。 // 运行训练验证分割,并选择最佳参数集。 var model = trainValidationSplit.fit(training) // 关闭Spark会话。 ss.stop()}
CustomRegressionEvaluator.scala:
import org.apache.spark.ml.evaluation.{Evaluator, RegressionEvaluator}import org.apache.spark.ml.param.{Param, ParamMap, Params}import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable}import org.apache.spark.mllib.evaluation.RegressionMetricsimport org.apache.spark.sql.{Dataset, Row}import org.apache.spark.sql.functions._import org.apache.spark.sql.types._final class CustomRegressionEvaluator (override val uid: String) extends Evaluator with HasPredictionCol with HasLabelCol with DefaultParamsWritable { def this() = this(Identifiable.randomUID("regEval")) def checkNumericType( schema: StructType, colName: String, msg: String = ""): Unit = { val actualDataType = schema(colName).dataType val message = if (msg != null && msg.trim.length > 0) " " + msg else "" require(actualDataType.isInstanceOf[NumericType], s"Column $colName must be of type " + s"NumericType but was actually of type $actualDataType.$message") } def checkColumnTypes( schema: StructType, colName: String, dataTypes: Seq[DataType], msg: String = ""): Unit = { val actualDataType = schema(colName).dataType val message = if (msg != null && msg.trim.length > 0) " " + msg else "" require(dataTypes.exists(actualDataType.equals), s"Column $colName must be of type equal to one of the following types: " + s"${dataTypes.mkString("[", ", ", "]")} but was actually of type $actualDataType.$message") } var i = 0 // 计数evaluate方法被调用的次数 override def evaluate(dataset: Dataset[_]): Double = { val schema = dataset.schema checkColumnTypes(schema, $(predictionCol), Seq(DoubleType, FloatType)) checkNumericType(schema, $(labelCol)) val predictionAndLabels = dataset .select(col($(predictionCol)).cast(DoubleType), col($(labelCol)).cast(DoubleType)) .rdd .map { case Row(prediction: Double, label: Double) => (prediction, label) } val metrics = new RegressionMetrics(predictionAndLabels) val metric = "mae" match { case "rmse" => metrics.rootMeanSquaredError case "mse" => metrics.meanSquaredError case "r2" => metrics.r2 case "mae" => metrics.meanAbsoluteError } println(s"$i $metric") // 打印指标 i = i + 1 // 更新计数器 metric } override def copy(extra: ParamMap): RegressionEvaluator = defaultCopy(extra)}object RegressionEvaluator extends DefaultParamsReadable[RegressionEvaluator] { override def load(path: String): RegressionEvaluator = super.load(path)}private[ml] trait HasPredictionCol extends Params { /** * Param for prediction column name. * @group param */ final val predictionCol: Param[String] = new Param[String](this, "predictionCol", "prediction column name") setDefault(predictionCol, "prediction") /** @group getParam */ final def getPredictionCol: String = $(predictionCol)}private[ml] trait HasLabelCol extends Params { /** * Param for label column name. * @group param */ final val labelCol: Param[String] = new Param[String](this, "labelCol", "label column name") setDefault(labelCol, "label") /** @group getParam */ final def getLabelCol: String = $(labelCol)}
回答:
对于支持目标历史的LinearRegression
和任何其他算法的特定情况,这里有一个可能的解决方案(在这种情况下,LinearRegressionTrainingSummary
可以胜任)。
首先,让我们创建一个最小可验证且完整的示例:
import org.apache.spark.ml.param.ParamMapimport org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel}import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}import org.apache.spark.mllib.util.{LinearDataGenerator, MLUtils}import org.apache.spark.sql.SparkSessionval spark: SparkSession = SparkSession.builder().getOrCreate()import org.apache.spark.ml.evaluation.RegressionEvaluatorimport spark.implicits._val data = { val tmp = LinearDataGenerator.generateLinearRDD( spark.sparkContext, nexamples = 10000, nfeatures = 4, eps = 0.05 ).toDF MLUtils.convertVectorColumnsToML(tmp, "features")}
如您所见,当您想要为spark-mllib
或spark-ml
生成测试数据时,建议使用数据生成器。
现在,让我们训练一个线性回归器:
// 创建线性回归模型。val lr = new LinearRegression().setMaxIter(1000)// 以下行将创建两组参数val paramGrid = new ParamGridBuilder().addGrid(lr.regParam, Array(0.001)).addGrid(lr.fitIntercept).addGrid(lr.elasticNetParam, Array(0.5)).build()// 使用验证分割创建训练器,以评估哪组参数表现最佳。// 我在这里使用的是常规的RegressionEvaluatorval trainValidationSplit = new TrainValidationSplit() .setEstimator(lr) .setEvaluator(new RegressionEvaluator) .setEstimatorParamMaps(paramGrid) .setTrainRatio(0.8) // 80%的数据用于训练,剩余20%用于验证。// 要检索子模型,请确保在拟合之前将collectSubModels设置为true。trainValidationSplit.setCollectSubModels(true)// 运行训练验证分割,并选择最佳参数集。var model = trainValidationSplit.fit(data)
现在我们的模型已经训练完毕,我们只需要获取目标历史即可。
接下来的部分需要在模型和子模型对象参数之间进行一些操作。
如果您有一个Pipeline
或类似的东西,这段代码需要修改,因此请谨慎使用。这只是一个示例:
val objectiveHist = spark.sparkContext.parallelize( model.subModels.zip(model.getEstimatorParamMaps).map { case (m: LinearRegressionModel, pm: ParamMap) => val history: Array[Double] = m.summary.objectiveHistory val idx: Seq[Int] = 1 until history.length // regParam, elasticNetParam, fitIntercept val parameters = pm.toSeq.map(pair => (pair.param.name, pair.value.toString)) match { case Seq(x, y, z) => (x._2, y._2, z._2) } (parameters._1, parameters._2, parameters._3, idx.zip(history).toMap) }).toDF("regParam", "elasticNetParam", "fitIntercept", "objectiveHistory")
现在我们可以检查这些指标:
objectiveHist.show(false)// +--------+---------------+------------+-------------------------------------------------------------------------------------------------------+// |regParam|elasticNetParam|fitIntercept|objectiveHistory |// +--------+---------------+------------+-------------------------------------------------------------------------------------------------------+// |0.001 |0.5 |true |[1 -> 0.4999999999999999, 2 -> 0.4038796441909531, 3 -> 0.02659222058006269, 4 -> 0.026592220340980147]|// |0.001 |0.5 |false |[1 -> 0.5000637621421942, 2 -> 0.4039303922115196, 3 -> 0.026592220673025396, 4 -> 0.02659222039347222]|// +--------+---------------+------------+-------------------------------------------------------------------------------------------------------+
您可以注意到训练过程实际上在4次迭代后停止。
如果您只想要迭代次数,可以执行以下操作:
val objectiveHist2 = spark.sparkContext.parallelize( model.subModels.zip(model.getEstimatorParamMaps).map { case (m: LinearRegressionModel, pm: ParamMap) => val history: Array[Double] = m.summary.objectiveHistory // regParam, elasticNetParam, fitIntercept val parameters = pm.toSeq.map(pair => (pair.param.name, pair.value.toString)) match { case Seq(x, y, z) => (x._2, y._2, z._2) } (parameters._1, parameters._2, parameters._3, history.size) }).toDF("regParam", "elasticNetParam", "fitIntercept", "iterations")
为了演示,我在生成器中更改了特征数量(nfeatures = 100
):
objectiveHist2.show// +--------+---------------+------------+----------+// |regParam|elasticNetParam|fitIntercept|iterations|// +--------+---------------+------------+----------+// | 0.001| 0.5| true| 11|// | 0.001| 0.5| false| 11|// +--------+---------------+------------+----------+