我在尝试部署Google Dataflow流式处理,以用于我的机器学习流水线,但似乎无法在内存中已加载文件的情况下部署工作节点。目前,我已经设置了作业从GCS存储桶中拉取一个pickle文件,加载到内存中,并用于模型预测。但这在每次作业周期中都会执行,即每次有新对象进入数据流水线时都会从GCS拉取——这意味着当前的执行速度远低于所需速度。
我真正需要的是,在每个工作节点设置时,在节点中分配一个变量。然后在流水线中使用该变量,而无需在每次执行流水线时重新加载。
在作业部署之前,有没有办法执行这样的步骤,像是
with open('model.pkl', 'rb') as file: pickle_model = pickle.load(file)
但是在我的setup.py文件中实现?
##### based on - https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/complete/juliaset/setup.py"""Setup.py module for the workflow's worker utilities.All the workflow related code is gathered in a package that will be built as asource distribution, staged in the staging area for the workflow being run andthen installed in the workers when they start running.This behavior is triggered by specifying the --setup_file command line optionwhen running the workflow for remote execution."""# pytype: skip-filefrom __future__ import absolute_importfrom __future__ import print_functionimport subprocessfrom distutils.command.build import build as _build # type: ignoreimport setuptools# This class handles the pip install mechanism.class build(_build): # pylint: disable=invalid-name """A build command class that will be invoked during package install. The package built using the current setup.py will be staged and later installed in the worker using `pip install package'. This class will be instantiated during install for this specific scenario and will trigger running the custom commands specified. """ sub_commands = _build.sub_commands + [('CustomCommands', None)]CUSTOM_COMMANDS = [['pip', 'install', 'scikit-learn==0.23.1']]CUSTOM_COMMANDS = [['pip', 'install', 'google-cloud-storage']]CUSTOM_COMMANDS = [['pip', 'install', 'mlxtend']]class CustomCommands(setuptools.Command): """A setuptools Command class able to run arbitrary commands.""" def initialize_options(self): pass def finalize_options(self): pass def RunCustomCommand(self, command_list): print('Running command: %s' % command_list) p = subprocess.Popen( command_list, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) # Can use communicate(input='y\n'.encode()) if the command run requires # some confirmation. stdout_data, _ = p.communicate() print('Command output: %s' % stdout_data) if p.returncode != 0: raise RuntimeError( 'Command %s failed: exit code: %s' % (command_list, p.returncode)) def run(self): for command in CUSTOM_COMMANDS: self.RunCustomCommand(command)REQUIRED_PACKAGES = [ 'google-cloud-storage', 'mlxtend', 'scikit-learn==0.23.1',]setuptools.setup( name='ML pipeline', version='0.0.1', description='ML set workflow package.', install_requires=REQUIRED_PACKAGES, packages=setuptools.find_packages(), cmdclass={ 'build': build, 'CustomCommands': CustomCommands, })
当前ML加载机制的片段:
class MlModel(beam.DoFn): def __init__(self): self._model = None from google.cloud import storage import pandas as pd import pickle as pkl self._storage = storage self._pkl = pkl self._pd = pd def process(self,element): if self._model is None: bucket = self._storage.Client().get_bucket(myBucket) blob = bucket.get_blob(myBlob) self._model = self._pkl.loads(blob.download_as_string()) new_df = self._pd.read_json(element, orient='records').iloc[:, 3:-1] predict = self._model.predict(new_df) df = self._pd.DataFrame(data=predict, columns=["A", "B"]) A = df.iloc[0]['A'] B = df.iloc[0]['B'] d = {'A':A, 'B':B} return [d]
回答:
您可以在MlModel
的DoFn
方法中使用@Setup
方法,在那里加载您的模型,然后在@Process
方法中使用它。@Setup
方法在每个工作节点初始化时调用一次。
我曾在这里写过一个类似的回答
希望对您有帮助