我正在尝试使用 Flink 1.1.3 版本的 ALS 代码,使用以下命令:
mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-scala \ -DarchetypeVersion=1.1.3 \ -DgroupId=org.apache.flink.quickstart \ -DartifactId=flink-scala-project \ -Dversion=0.1 \ -Dpackage=org.apache.flink.quickstart \ -DinteractiveMode=false
我参考了以下示例代码: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/libs/ml/als.html,并将数据集中的 Int 类型改为了 Long 类型
val env = ExecutionEnvironment.getExecutionEnvironmentval csvInput: DataSet[(Long, Long, Double)] = env.readCsvFile[(Long, Long, Double)]("tmp-contactos.csv")// 设置 ALS 学习器val als = ALS() .setIterations(10) .setNumFactors(10) .setBlocks(100)// 通过参数映射设置其他参数val parameters = ParameterMap() .add(ALS.Lambda, 0.9) .add(ALS.Seed, 42L)// 计算因子分解als.fit(csvInput, parameters)
但在运行时抛出了以下异常:
Exception in thread "main" java.lang.RuntimeException: There is no FitOperation defined for org.apache.flink.ml.recommendation.ALS which trains on a DataSet[(Long, Int, Double)]at org.apache.flink.ml.pipeline.Estimator$$anon$4.fit(Estimator.scala:85)at org.apache.flink.ml.pipeline.Estimator$class.fit(Estimator.scala:55)at org.apache.flink.ml.recommendation.ALS.fit(ALS.scala:122)at org.apache.flink.quickstart.BatchJob$.main(BatchJob.scala:119)at org.apache.flink.quickstart.BatchJob.main(BatchJob.scala)
是否可以使用 Long 类型而不是 Int 类型?
我搜索过,发现了 0.9 版本的相关信息,但没有找到 1.1.3 版本的相关信息:
https://issues.apache.org/jira/browse/FLINK-2211
回答:
目前官方还不支持,但我已经创建了一个分支,修复了这个限制。你可以尝试这个分支。我会将它贡献给 Flink,相信它会在不久的将来成为主分支的一部分。