我正在使用一个 UDF 来将一些机器学习模型应用于数据流。因为来自第三方库的 Model
类不能被 Flink 自动序列化,我使用了如下所示的两个变量:
class MyUDF extends KeyedCoProcessFunction[String, ModelDef, Data, Prediction] with CheckpointedFunction { // 用于保存加载的模型 @transient private var models: HashMap[(String, String), Model] = _ // 用于序列化目的 @transient private var modelsBytes: MapState[(String, String), Array[Bytes]] = _ ...}
其中:
models
保存加载的(运行中的)模型(由ModelDef
创建,基本是一个字符串)modelsBytes
是真实的(键控的)状态,它保存相同的模型,但以字节块的形式,以便检查点能够正常工作。
整体解决方案很简单(只需在恢复/保存模型时调用 fromBytes
/toBytes
),但我不知道这是否是一种常见/最佳实践。使用两个变量来保存本质上相同的东西看起来有些奇怪。例如,这里 你可以找到一个使用 TypeSerializer[Option[Model]]
的例子,看起来更整洁,但也更复杂实现。
所以,基本上:
-
我应该使用
TypeSerializer
方法,还是对于运行/序列化模型使用某种重复的状态是可以的? -
另外,如果你能指出一些关于 Flink 中自定义类型序列化的文档/示例,那就太好了,通常我觉得官方文档在这方面有点欠缺。
回答:
我可能会选择堆状态后端和自定义的 TypeSerializer
。
堆状态后端只会在检查点时序列化数据,其余时间保持数据原样。因此,使用该后端比自己管理映射几乎没有性能损失。然而,它将消除手动执行序列化和同步的需要。