如何在内存中加载文件后部署Google Dataflow工作节点?

我在尝试部署Google Dataflow流式处理,以用于我的机器学习流水线,但似乎无法在内存中已加载文件的情况下部署工作节点。目前,我已经设置了作业从GCS存储桶中拉取一个pickle文件,加载到内存中,并用于模型预测。但这在每次作业周期中都会执行,即每次有新对象进入数据流水线时都会从GCS拉取——这意味着当前的执行速度远低于所需速度。

我真正需要的是,在每个工作节点设置时,在节点中分配一个变量。然后在流水线中使用该变量,而无需在每次执行流水线时重新加载。

在作业部署之前,有没有办法执行这样的步骤,像是

with open('model.pkl', 'rb') as file:   pickle_model = pickle.load(file)

但是在我的setup.py文件中实现?

##### based on - https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/complete/juliaset/setup.py"""Setup.py module for the workflow's worker utilities.All the workflow related code is gathered in a package that will be built as asource distribution, staged in the staging area for the workflow being run andthen installed in the workers when they start running.This behavior is triggered by specifying the --setup_file command line optionwhen running the workflow for remote execution."""# pytype: skip-filefrom __future__ import absolute_importfrom __future__ import print_functionimport subprocessfrom distutils.command.build import build as _build  # type: ignoreimport setuptools# This class handles the pip install mechanism.class build(_build):  # pylint: disable=invalid-name    """A build command class that will be invoked during package install.    The package built using the current setup.py will be staged and later    installed in the worker using `pip install package'. This class will be    instantiated during install for this specific scenario and will trigger    running the custom commands specified.    """    sub_commands = _build.sub_commands + [('CustomCommands', None)]CUSTOM_COMMANDS = [['pip', 'install', 'scikit-learn==0.23.1']]CUSTOM_COMMANDS = [['pip', 'install', 'google-cloud-storage']]CUSTOM_COMMANDS = [['pip', 'install', 'mlxtend']]class CustomCommands(setuptools.Command):    """A setuptools Command class able to run arbitrary commands."""    def initialize_options(self):        pass        def finalize_options(self):        pass    def RunCustomCommand(self, command_list):        print('Running command: %s' % command_list)        p = subprocess.Popen(            command_list,            stdin=subprocess.PIPE,            stdout=subprocess.PIPE,            stderr=subprocess.STDOUT)        # Can use communicate(input='y\n'.encode()) if the command run requires        # some confirmation.        stdout_data, _ = p.communicate()        print('Command output: %s' % stdout_data)        if p.returncode != 0:            raise RuntimeError(                'Command %s failed: exit code: %s' % (command_list, p.returncode))    def run(self):        for command in CUSTOM_COMMANDS:            self.RunCustomCommand(command)REQUIRED_PACKAGES = [    'google-cloud-storage',    'mlxtend',    'scikit-learn==0.23.1',]setuptools.setup(    name='ML pipeline',    version='0.0.1',    description='ML set workflow package.',    install_requires=REQUIRED_PACKAGES,    packages=setuptools.find_packages(),    cmdclass={        'build': build,        'CustomCommands': CustomCommands,    })

当前ML加载机制的片段:

class MlModel(beam.DoFn):    def __init__(self):        self._model = None        from google.cloud import storage        import pandas as pd        import pickle as pkl        self._storage = storage        self._pkl = pkl        self._pd = pd            def process(self,element):        if self._model is None:            bucket = self._storage.Client().get_bucket(myBucket)            blob = bucket.get_blob(myBlob)            self._model = self._pkl.loads(blob.download_as_string())        new_df = self._pd.read_json(element, orient='records').iloc[:, 3:-1]         predict = self._model.predict(new_df)        df = self._pd.DataFrame(data=predict, columns=["A", "B"])        A = df.iloc[0]['A']        B = df.iloc[0]['B']        d = {'A':A, 'B':B}        return [d] 

回答:

您可以在MlModelDoFn方法中使用@Setup方法,在那里加载您的模型,然后在@Process方法中使用它。@Setup方法在每个工作节点初始化时调用一次。

我曾在这里写过一个类似的回答

希望对您有帮助

Related Posts

使用LSTM在Python中预测未来值

这段代码可以预测指定股票的当前日期之前的值,但不能预测…

如何在gensim的word2vec模型中查找双词组的相似性

我有一个word2vec模型,假设我使用的是googl…

dask_xgboost.predict 可以工作但无法显示 – 数据必须是一维的

我试图使用 XGBoost 创建模型。 看起来我成功地…

ML Tuning – Cross Validation in Spark

我在https://spark.apache.org/…

如何在React JS中使用fetch从REST API获取预测

我正在开发一个应用程序,其中Flask REST AP…

如何分析ML.NET中多类分类预测得分数组?

我在ML.NET中创建了一个多类分类项目。该项目可以对…

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注