我在Kedro中有一个如下所示的管道:
from kedro.pipeline import Pipeline, nodefrom .nodes import *def foo(): return Pipeline([ node(a, inputs=["train_x", "test_x"], outputs=dict(bar_a="bar_a"), name="A"), node(b, inputs=["train_x", "test_x"], outputs=dict(bar_b="bar_b"), name="B"), node(c, inputs=["train_x", "test_x"], outputs=dict(bar_c="bar_c"), name="C"), node(d, inputs=["train_x", "test_x"], outputs=dict(bar_d="bar_d"), name="D"), ])
节点A、B和C的资源消耗不大,但它们需要一些时间,所以我想让它们并行运行。另一方面,节点D几乎使用了我所有的内存,如果它与其他节点同时执行就会失败。有没有办法可以告诉Kedro在执行节点D之前等待A、B和C完成,并且保持代码的整洁?
回答:
Kedro根据不同节点的输入/输出之间的相互依赖关系来确定执行顺序。在你的例子中,节点D不依赖于其他任何节点,因此无法保证执行顺序。同样,如果使用并行运行器,也无法确保节点D不会与A、B和C并行运行。
尽管如此,还是有一些变通方法可以实现特定的执行顺序。
1 [首选] 单独运行节点
与其使用kedro run --parallel
,你可以这样做:
kedro run --pipeline foo --node A --node B --node C --parallel; kedro run --pipeline foo --node D
这可以说是首选的解决方案,因为它不需要更改代码(这在你将来在不同机器上运行相同管道时是有利的)。如果你希望只有在A、B和C成功后才运行节点D,你可以使用&&
代替;
。如果运行逻辑变得更加复杂,你可以将其存储在Makefile/bash脚本中。
2 使用虚拟输入/输出
你还可以通过引入虚拟数据集来强制执行顺序。像这样:
def foo(): return Pipeline([ node(a, inputs=["train_x", "test_x"], outputs=[dict(bar_a="bar_a"), "a_done"], name="A"), node(b, inputs=["train_x", "test_x"], outputs=[dict(bar_b="bar_b"), "b_done"], name="B"), node(c, inputs=["train_x", "test_x"], outputs=[dict(bar_c="bar_c"), "c_done"], name="C"), node(d, inputs=["train_x", "test_x", "a_done", "b_done", "c_done"], outputs=dict(bar_d="bar_d"), name="D"), ])
虚拟数据集可以使用空列表。底层函数也必须返回/接受这些额外的参数。
这种方法的优点是kedro run --parallel
会立即产生所需的执行逻辑。缺点是它污染了节点和底层函数的定义。
如果你选择这条路,你还需要决定是否要将虚拟数据集存储在数据目录中(会进一步污染,但允许单独运行节点D)还是不存储(节点D不能单独运行)。