我在我们的机器学习管道中开始使用Dagster,并且遇到了一些基本问题,我在想我是不是忽略了一些简单的东西,还是这就是它的本来面貌…
假设我有一个简单的机器学习管道:
加载原始数据 --> 将数据处理成表格 --> 分割训练/测试 --> 训练模型 --> 评估模型。
在Dagster中,线性模型是直接了当的。但如果我想添加一个小循环,比如为了交叉验证的目的:
加载原始数据 --> 将数据处理成表格 --> 分成k个折叠,并且对于每个折叠: - 折叠1: 训练模型 --> 评估 - 折叠2: 训练模型 --> 评估 - 折叠3: 训练模型 --> 评估 --> 总结交叉验证结果。
在Dagster中,有没有一个简洁美观的方法来实现这个?我的做法是:
加载原始数据 --> 将数据处理成表格 --> 分成K个折叠 --> 选择折叠k --> 训练模型 --> 评估模型
将折叠”k”作为管道的输入参数。然后运行管道K次。
我在这里错过了什么吗?
回答:
是的,Dagster确实支持在一个单一管道内,让solid分散成多个solid,然后再汇聚到一个接收solid(即总结结果)。这里是一些示例代码和在dagit中的相应dag可视化(完整dag和放大视图)。
@soliddef load_raw_data(_): yield Output('loaded_data')@soliddef process_data_into_table(_, raw_data): yield Output(raw_data)@solid( output_defs=[ OutputDefinition(name='fold_one', dagster_type=int, is_required=True), OutputDefinition(name='fold_two', dagster_type=int, is_required=True), ],)def split_into_two_folds(_, table): yield Output(1, 'fold_one') yield Output(2, 'fold_two')@soliddef train_fold(_, fold): yield Output('model')@soliddef evaluate_fold(_, model): yield Output('compute_result')@composite_soliddef process_fold(fold): return evaluate_fold(train_fold(fold))@soliddef summarize_results(context, fold_1_result, fold_2_result): yield Output('summary_stats')@pipelinedef ml_pipeline(): fold_one, fold_two = split_into_two_folds(process_data_into_table(load_raw_data())) process_fold_one = process_fold.alias('process_fold_one') process_fold_two = process_fold.alias('process_fold_two') summarize_results(process_fold_one(fold_one), process_fold_two(fold_two))
在示例代码中,我们使用别名以便为每个折叠重用相同的逻辑。我们还将处理每个折叠的逻辑整合在复合solid中。
另一种选择是直接以编程方式创建一个PipelineDefinition,但我建议使用上述方法。