如何在内存中加载文件后部署Google Dataflow工作节点?

我在尝试部署Google Dataflow流式处理,以用于我的机器学习流水线,但似乎无法在内存中已加载文件的情况下部署工作节点。目前,我已经设置了作业从GCS存储桶中拉取一个pickle文件,加载到内存中,并用于模型预测。但这在每次作业周期中都会执行,即每次有新对象进入数据流水线时都会从GCS拉取——这意味着当前的执行速度远低于所需速度。

我真正需要的是,在每个工作节点设置时,在节点中分配一个变量。然后在流水线中使用该变量,而无需在每次执行流水线时重新加载。

在作业部署之前,有没有办法执行这样的步骤,像是

with open('model.pkl', 'rb') as file:   pickle_model = pickle.load(file)

但是在我的setup.py文件中实现?

##### based on - https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/complete/juliaset/setup.py"""Setup.py module for the workflow's worker utilities.All the workflow related code is gathered in a package that will be built as asource distribution, staged in the staging area for the workflow being run andthen installed in the workers when they start running.This behavior is triggered by specifying the --setup_file command line optionwhen running the workflow for remote execution."""# pytype: skip-filefrom __future__ import absolute_importfrom __future__ import print_functionimport subprocessfrom distutils.command.build import build as _build  # type: ignoreimport setuptools# This class handles the pip install mechanism.class build(_build):  # pylint: disable=invalid-name    """A build command class that will be invoked during package install.    The package built using the current setup.py will be staged and later    installed in the worker using `pip install package'. This class will be    instantiated during install for this specific scenario and will trigger    running the custom commands specified.    """    sub_commands = _build.sub_commands + [('CustomCommands', None)]CUSTOM_COMMANDS = [['pip', 'install', 'scikit-learn==0.23.1']]CUSTOM_COMMANDS = [['pip', 'install', 'google-cloud-storage']]CUSTOM_COMMANDS = [['pip', 'install', 'mlxtend']]class CustomCommands(setuptools.Command):    """A setuptools Command class able to run arbitrary commands."""    def initialize_options(self):        pass        def finalize_options(self):        pass    def RunCustomCommand(self, command_list):        print('Running command: %s' % command_list)        p = subprocess.Popen(            command_list,            stdin=subprocess.PIPE,            stdout=subprocess.PIPE,            stderr=subprocess.STDOUT)        # Can use communicate(input='y\n'.encode()) if the command run requires        # some confirmation.        stdout_data, _ = p.communicate()        print('Command output: %s' % stdout_data)        if p.returncode != 0:            raise RuntimeError(                'Command %s failed: exit code: %s' % (command_list, p.returncode))    def run(self):        for command in CUSTOM_COMMANDS:            self.RunCustomCommand(command)REQUIRED_PACKAGES = [    'google-cloud-storage',    'mlxtend',    'scikit-learn==0.23.1',]setuptools.setup(    name='ML pipeline',    version='0.0.1',    description='ML set workflow package.',    install_requires=REQUIRED_PACKAGES,    packages=setuptools.find_packages(),    cmdclass={        'build': build,        'CustomCommands': CustomCommands,    })

当前ML加载机制的片段:

class MlModel(beam.DoFn):    def __init__(self):        self._model = None        from google.cloud import storage        import pandas as pd        import pickle as pkl        self._storage = storage        self._pkl = pkl        self._pd = pd            def process(self,element):        if self._model is None:            bucket = self._storage.Client().get_bucket(myBucket)            blob = bucket.get_blob(myBlob)            self._model = self._pkl.loads(blob.download_as_string())        new_df = self._pd.read_json(element, orient='records').iloc[:, 3:-1]         predict = self._model.predict(new_df)        df = self._pd.DataFrame(data=predict, columns=["A", "B"])        A = df.iloc[0]['A']        B = df.iloc[0]['B']        d = {'A':A, 'B':B}        return [d] 

回答:

您可以在MlModelDoFn方法中使用@Setup方法,在那里加载您的模型,然后在@Process方法中使用它。@Setup方法在每个工作节点初始化时调用一次。

我曾在这里写过一个类似的回答

希望对您有帮助

Related Posts

Keras Dense层输入未被展平

这是我的测试代码: from keras import…

无法将分类变量输入随机森林

我有10个分类变量和3个数值变量。我在分割后直接将它们…

如何在Keras中对每个输出应用Sigmoid函数?

这是我代码的一部分。 model = Sequenti…

如何选择类概率的最佳阈值?

我的神经网络输出是一个用于多标签分类的预测类概率表: …

在Keras中使用深度学习得到不同的结果

我按照一个教程使用Keras中的深度神经网络进行文本分…

‘MatMul’操作的输入’b’类型为float32,与参数’a’的类型float64不匹配

我写了一个简单的TensorFlow代码,但不断遇到T…

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注