在 Flink 中序列化复杂模型的最佳实践

我正在使用一个 UDF 来将一些机器学习模型应用于数据流。因为来自第三方库的 Model 类不能被 Flink 自动序列化,我使用了如下所示的两个变量:

class MyUDF extends KeyedCoProcessFunction[String, ModelDef, Data, Prediction]  with CheckpointedFunction {  // 用于保存加载的模型  @transient private var models: HashMap[(String, String), Model] = _  // 用于序列化目的  @transient private var modelsBytes: MapState[(String, String), Array[Bytes]] = _  ...}

其中:

  • models 保存加载的(运行中的)模型(由 ModelDef 创建,基本是一个字符串)
  • modelsBytes 是真实的(键控的)状态,它保存相同的模型,但以字节块的形式,以便检查点能够正常工作。

整体解决方案很简单(只需在恢复/保存模型时调用 fromBytes/toBytes),但我不知道这是否是一种常见/最佳实践。使用两个变量来保存本质上相同的东西看起来有些奇怪。例如,这里 你可以找到一个使用 TypeSerializer[Option[Model]] 的例子,看起来更整洁,但也更复杂实现。

所以,基本上:

  • 我应该使用 TypeSerializer 方法,还是对于运行/序列化模型使用某种重复的状态是可以的?

  • 另外,如果你能指出一些关于 Flink 中自定义类型序列化的文档/示例,那就太好了,通常我觉得官方文档在这方面有点欠缺。


回答:

我可能会选择堆状态后端和自定义的 TypeSerializer

堆状态后端只会在检查点时序列化数据,其余时间保持数据原样。因此,使用该后端比自己管理映射几乎没有性能损失。然而,它将消除手动执行序列化和同步的需要。

Related Posts

使用LSTM在Python中预测未来值

这段代码可以预测指定股票的当前日期之前的值,但不能预测…

如何在gensim的word2vec模型中查找双词组的相似性

我有一个word2vec模型,假设我使用的是googl…

dask_xgboost.predict 可以工作但无法显示 – 数据必须是一维的

我试图使用 XGBoost 创建模型。 看起来我成功地…

ML Tuning – Cross Validation in Spark

我在https://spark.apache.org/…

如何在React JS中使用fetch从REST API获取预测

我正在开发一个应用程序,其中Flask REST AP…

如何分析ML.NET中多类分类预测得分数组?

我在ML.NET中创建了一个多类分类项目。该项目可以对…

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注