Spark: 使用Spark绘制模型的学习曲线

我正在使用Spark,并希望训练一个机器学习模型。

由于结果不佳,我想显示模型在训练的每个周期(在训练和测试数据集上)所犯的错误。

然后我将使用这些信息来确定我的模型是否在数据上欠拟合或过拟合。

问题: 如何使用Spark绘制模型的学习曲线?

在下面的例子中,我实现了自己的评估器,并重写了evaluate方法以打印我需要的指标,但只显示了两个值(maxIter = 1000)。

MinimalRunnableCode.scala:

import org.apache.spark.SparkConfimport org.apache.spark.ml.linalg.Vectorsimport org.apache.spark.ml.regression.LinearRegressionimport org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}import org.apache.spark.sql.SparkSessionobject Min extends App {  // 打开Spark会话。  val conf = new SparkConf()    .setMaster("local")    .set("spark.network.timeout", "800")  val ss = SparkSession.builder    .config(conf)    .getOrCreate  // 加载数据。  val data = ss.createDataFrame(ss.sparkContext.parallelize(      List(        (Vectors.dense(1, 2), 1),        (Vectors.dense(1, 3), 2),        (Vectors.dense(1, 2), 1),        (Vectors.dense(1, 3), 2),        (Vectors.dense(1, 2), 1),        (Vectors.dense(1, 3), 2),        (Vectors.dense(1, 2), 1),        (Vectors.dense(1, 3), 2),        (Vectors.dense(1, 2), 1),        (Vectors.dense(1, 3), 2),        (Vectors.dense(1, 4), 3)      )    ))    .withColumnRenamed("_1", "features")    .withColumnRenamed("_2", "label")  val Array(training, test) = data.randomSplit(Array(0.8, 0.2), seed = 42)  // 创建线性回归模型。  val lr = new LinearRegression().setMaxIter(1000)  // 创建参数网格,用于训练线性模型的不同版本。  val paramGrid = new ParamGridBuilder()    .addGrid(lr.regParam, Array(0.001))    .addGrid(lr.fitIntercept)    .addGrid(lr.elasticNetParam, Array(0.5))    .build()  // 使用验证分割创建训练器,以评估哪组参数表现最佳。  val trainValidationSplit = new TrainValidationSplit()    .setEstimator(lr)    .setEvaluator(new CustomRegressionEvaluator)    .setEstimatorParamMaps(paramGrid)    .setTrainRatio(0.8) // 80%的数据用于训练,剩余20%用于验证。  // 运行训练验证分割,并选择最佳参数集。  var model = trainValidationSplit.fit(training)  // 关闭Spark会话。  ss.stop()}

CustomRegressionEvaluator.scala:

import org.apache.spark.ml.evaluation.{Evaluator, RegressionEvaluator}import org.apache.spark.ml.param.{Param, ParamMap, Params}import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable}import org.apache.spark.mllib.evaluation.RegressionMetricsimport org.apache.spark.sql.{Dataset, Row}import org.apache.spark.sql.functions._import org.apache.spark.sql.types._final class CustomRegressionEvaluator (override val uid: String) extends Evaluator with HasPredictionCol with HasLabelCol with DefaultParamsWritable {  def this() = this(Identifiable.randomUID("regEval"))  def checkNumericType(                        schema: StructType,                        colName: String,                        msg: String = ""): Unit = {    val actualDataType = schema(colName).dataType    val message = if (msg != null && msg.trim.length > 0) " " + msg else ""    require(actualDataType.isInstanceOf[NumericType], s"Column $colName must be of type " +      s"NumericType but was actually of type $actualDataType.$message")  }  def checkColumnTypes(                        schema: StructType,                        colName: String,                        dataTypes: Seq[DataType],                        msg: String = ""): Unit = {    val actualDataType = schema(colName).dataType    val message = if (msg != null && msg.trim.length > 0) " " + msg else ""    require(dataTypes.exists(actualDataType.equals),      s"Column $colName must be of type equal to one of the following types: " +        s"${dataTypes.mkString("[", ", ", "]")} but was actually of type $actualDataType.$message")  }  var i = 0 // 计数evaluate方法被调用的次数  override def evaluate(dataset: Dataset[_]): Double = {    val schema = dataset.schema    checkColumnTypes(schema, $(predictionCol), Seq(DoubleType, FloatType))    checkNumericType(schema, $(labelCol))    val predictionAndLabels = dataset      .select(col($(predictionCol)).cast(DoubleType), col($(labelCol)).cast(DoubleType))      .rdd      .map { case Row(prediction: Double, label: Double) => (prediction, label) }    val metrics = new RegressionMetrics(predictionAndLabels)    val metric = "mae" match {      case "rmse" => metrics.rootMeanSquaredError      case "mse" => metrics.meanSquaredError      case "r2" => metrics.r2      case "mae" => metrics.meanAbsoluteError    }    println(s"$i $metric") // 打印指标    i = i + 1 // 更新计数器    metric  }  override def copy(extra: ParamMap): RegressionEvaluator = defaultCopy(extra)}object RegressionEvaluator extends DefaultParamsReadable[RegressionEvaluator] {  override def load(path: String): RegressionEvaluator = super.load(path)}private[ml] trait HasPredictionCol extends Params {  /**    * Param for prediction column name.    * @group param    */  final val predictionCol: Param[String] = new Param[String](this, "predictionCol", "prediction column name")  setDefault(predictionCol, "prediction")  /** @group getParam */  final def getPredictionCol: String = $(predictionCol)}private[ml] trait HasLabelCol extends Params {  /**    * Param for label column name.    * @group param    */  final val labelCol: Param[String] = new Param[String](this, "labelCol", "label column name")  setDefault(labelCol, "label")  /** @group getParam */  final def getLabelCol: String = $(labelCol)}

回答:

对于支持目标历史LinearRegression和任何其他算法的特定情况,这里有一个可能的解决方案(在这种情况下,LinearRegressionTrainingSummary可以胜任)。

首先,让我们创建一个最小可验证且完整的示例

import org.apache.spark.ml.param.ParamMapimport org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel}import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}import org.apache.spark.mllib.util.{LinearDataGenerator, MLUtils}import org.apache.spark.sql.SparkSessionval spark: SparkSession = SparkSession.builder().getOrCreate()import org.apache.spark.ml.evaluation.RegressionEvaluatorimport spark.implicits._val data = {  val tmp = LinearDataGenerator.generateLinearRDD(    spark.sparkContext,    nexamples = 10000,    nfeatures = 4,    eps = 0.05  ).toDF  MLUtils.convertVectorColumnsToML(tmp, "features")}

如您所见,当您想要为spark-mllibspark-ml生成测试数据时,建议使用数据生成器。

现在,让我们训练一个线性回归器:

// 创建线性回归模型。val lr = new LinearRegression().setMaxIter(1000)// 以下行将创建两组参数val paramGrid = new ParamGridBuilder().addGrid(lr.regParam, Array(0.001)).addGrid(lr.fitIntercept).addGrid(lr.elasticNetParam, Array(0.5)).build()// 使用验证分割创建训练器,以评估哪组参数表现最佳。// 我在这里使用的是常规的RegressionEvaluatorval trainValidationSplit = new TrainValidationSplit()  .setEstimator(lr)  .setEvaluator(new RegressionEvaluator)  .setEstimatorParamMaps(paramGrid)  .setTrainRatio(0.8) // 80%的数据用于训练,剩余20%用于验证。// 要检索子模型,请确保在拟合之前将collectSubModels设置为true。trainValidationSplit.setCollectSubModels(true)// 运行训练验证分割,并选择最佳参数集。var model = trainValidationSplit.fit(data)

现在我们的模型已经训练完毕,我们只需要获取目标历史即可。

接下来的部分需要在模型和子模型对象参数之间进行一些操作。

如果您有一个Pipeline或类似的东西,这段代码需要修改,因此请谨慎使用。这只是一个示例:

val objectiveHist = spark.sparkContext.parallelize(  model.subModels.zip(model.getEstimatorParamMaps).map {    case (m: LinearRegressionModel, pm: ParamMap) =>      val history: Array[Double] = m.summary.objectiveHistory      val idx: Seq[Int] = 1 until history.length      // regParam, elasticNetParam, fitIntercept      val parameters = pm.toSeq.map(pair => (pair.param.name, pair.value.toString)) match {        case Seq(x, y, z) => (x._2, y._2, z._2)      }      (parameters._1, parameters._2, parameters._3, idx.zip(history).toMap)  }).toDF("regParam", "elasticNetParam", "fitIntercept", "objectiveHistory")

现在我们可以检查这些指标:

objectiveHist.show(false)// +--------+---------------+------------+-------------------------------------------------------------------------------------------------------+// |regParam|elasticNetParam|fitIntercept|objectiveHistory                                                                                       |// +--------+---------------+------------+-------------------------------------------------------------------------------------------------------+// |0.001   |0.5            |true        |[1 -> 0.4999999999999999, 2 -> 0.4038796441909531, 3 -> 0.02659222058006269, 4 -> 0.026592220340980147]|// |0.001   |0.5            |false       |[1 -> 0.5000637621421942, 2 -> 0.4039303922115196, 3 -> 0.026592220673025396, 4 -> 0.02659222039347222]|// +--------+---------------+------------+-------------------------------------------------------------------------------------------------------+

您可以注意到训练过程实际上在4次迭代后停止。

如果您只想要迭代次数,可以执行以下操作:

val objectiveHist2 = spark.sparkContext.parallelize(  model.subModels.zip(model.getEstimatorParamMaps).map {    case (m: LinearRegressionModel, pm: ParamMap) =>      val history: Array[Double] = m.summary.objectiveHistory      // regParam, elasticNetParam, fitIntercept      val parameters = pm.toSeq.map(pair => (pair.param.name, pair.value.toString)) match {        case Seq(x, y, z) => (x._2, y._2, z._2)      }      (parameters._1, parameters._2, parameters._3, history.size)  }).toDF("regParam", "elasticNetParam", "fitIntercept", "iterations")

为了演示,我在生成器中更改了特征数量(nfeatures = 100):

objectiveHist2.show// +--------+---------------+------------+----------+// |regParam|elasticNetParam|fitIntercept|iterations|// +--------+---------------+------------+----------+// |   0.001|            0.5|        true|        11|// |   0.001|            0.5|       false|        11|// +--------+---------------+------------+----------+

Related Posts

这是过拟合吗?

我有一个卷积神经网络(CNN),在训练数据上的表现非常…

如何在Keras中连接两个LSTM模型

我想用Keras创建一个包含两个LSTM层的模型。然而…

为什么t-SNE方法使用欧几里得距离来计算高维数据的相似性?

我尝试过其他距离度量方法,如切比雪夫距离或曼哈顿距离等…

预期得到2个值,但实际得到0个

您好,我想知道当预期接收两个值却什么也没得到时,该如何…

如何理解TensorFlow.js层API的输入输出张量的工作原理

我有一个分类问题,具有8个输入和1个输出。我创建了以下…

如何在TensorFlow.js中控制输出范围

我有这样一个模型: const hidden = tf…

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注