我在运行Flink(0.10-SNAPSHOT)的多元线性回归示例。我无法弄清楚如何提取权重(例如斜率和截距,beta0-beta1,或者你想怎么称呼它们都可以)。我对Scala不是很熟练,这可能是我的问题的一半原因。
感谢任何人提供的帮助。
object Job { def main(args: Array[String]) { // 设置执行环境 val env = ExecutionEnvironment.getExecutionEnvironment val survival = env.readCsvFile[(String, String, String, String)]("/home/danger/IdeaProjects/quickstart/docs/haberman.data") val survivalLV = survival .map{tuple => val list = tuple.productIterator.toList val numList = list.map(_.asInstanceOf[String].toDouble) LabeledVector(numList(3), DenseVector(numList.take(3).toArray)) } val mlr = MultipleLinearRegression() .setStepsize(1.0) .setIterations(100) .setConvergenceThreshold(0.001) mlr.fit(survivalLV) println(mlr.toString()) // 这没有任何实际作用... println(mlr.weightsOption) // 这也没有作用。 }}
回答:
问题在于你只是构建了Flink作业(DAG),它会计算权重,但尚未执行。触发执行的最简单方法是使用collect
方法,它会将DataSet
的结果检索回你的客户端。
mlr.fit(survivalLV)val weights = mlr.weightsOption match { case Some(weights) => weights.collect() case None => throw new Exception("无法计算权重。")}println(weights)