Python并行编程模型

我正在编写一个机器学习程序,包括以下组件:

  1. 一个共享的“经验池”,使用类似二叉树的数据结构。

  2. N个模拟器进程。每个进程会不时地向池中添加一个“经验对象”。池负责平衡其树结构。

  3. M个学习器进程,这些进程每隔一段时间从池中抽取一批“经验对象”并执行相应的学习过程。

我不知道实现上述内容的最佳方法。我没有使用Tensorflow,因此无法利用其并行能力。更具体地说,

  • 我首先想到的是Python3内置的multiprocessing库。然而,与multithreading不同的是,multiprocessing模块无法让不同的进程更新同一个全局对象。我的直觉是应该使用服务器代理模型。有人能给我一个大致的框架代码作为起点吗?
  • MPI4py是否是一个更好的解决方案?
  • 还有其他更适合的库吗?我已经查看了celerydisque等。对于我的用例,如何适应它们并不明显。

回答:

根据评论,你真正寻找的是一种从一组执行CPU密集型任务的进程中更新共享对象的方法。CPU密集型任务使得multiprocessing成为一个明显的选择——如果你的大部分工作是IO密集型的,multithreading会是一个更简单的选择。

你的问题遵循一个简单的服务器-客户端模型:客户端使用服务器作为一个简单的有状态存储,不需要任何子进程之间的通信,也不需要进程同步。

因此,最简单的实现方法是:

  1. 启动一个包含服务器的独立进程。
  2. 在服务器逻辑中,提供方法来更新和读取单个对象。
  3. 将你的模拟器和学习器进程视为可以定期读取和更新全局状态的独立客户端。

从服务器的角度来看,客户端的身份并不重要——只有它们的动作才重要。

因此,可以通过在multiprocessing中使用自定义管理器来实现,如下所示:

# server.pyfrom multiprocessing.managers import BaseManager# this represents the data structure you've already implemented.from ... import ExperienceTree# An important note: the way proxy objects work is by shared weak reference to# the object. If all of your workers die, it takes your proxy object with# it. Thus, if you have an instance, the instance is garbage-collected# once all references to it have been erased. I have chosen to sidestep # this in my code by using class variables and objects so that instances# are never used - you may define __init__, etc. if you so wish, but# just be aware of what will happen to your object once all workers are gone.class ExperiencePool(object):    tree = ExperienceTree()    @classmethod    def update(cls, experience_object):        ''' Implement methods to update the tree with an experience object. '''        cls.tree.update(experience_object)    @classmethod    def sample(cls):        ''' Implement methods to sample the tree's experience objects. '''        return cls.tree.sample()# subclass base managerclass Server(BaseManager):    pass# register the class you just created - now you can access an instance of # ExperiencePool using Server.Shared_Experience_Pool().Server.register('Shared_Experience_Pool', ExperiencePool)if __name__ == '__main__':     # run the server on port 8080 of your own machine     with Server(('localhost', 8080), authkey=b'none') as server_process:         server_process.get_server().serve_forever()

现在,对于所有你的客户端,你可以这样做:

# client.py - you can always have a separate client file for a learner and a simulator.from multiprocessing.managers import BaseManagerfrom server import ExperiencePoolclass Server(BaseManager):     passServer.register('Shared_Experience_Pool', ExperiencePool)if __name__ == '__main__':     # run the server on port 8080 of your own machine forever.     server_process = Server(('localhost', 8080), authkey=b'none')     server_process.connect()     experience_pool = server_process.Shared_Experience_Pool()     # now do your own thing and call `experience_call.sample()` or `update` whenever you want. 

然后你可以启动一个server.py和任意数量的workers

这是最佳设计吗?

并非总是如此。你可能会遇到竞争条件,即如果学习器被迫与同时写入的模拟器节点竞争,它们可能会接收到陈旧或旧的数据。

如果你想确保优先考虑最新的写入,你可以额外使用,在你的模拟器尝试写入时使用,防止其他进程在写入完成之前进行读取。

Related Posts

L1-L2正则化的不同系数

我想对网络的权重同时应用L1和L2正则化。然而,我找不…

使用scikit-learn的无监督方法将列表分类成不同组别,有没有办法?

我有一系列实例,每个实例都有一份列表,代表它所遵循的不…

f1_score metric in lightgbm

我想使用自定义指标f1_score来训练一个lgb模型…

通过相关系数矩阵进行特征选择

我在测试不同的算法时,如逻辑回归、高斯朴素贝叶斯、随机…

可以将机器学习库用于流式输入和输出吗?

已关闭。此问题需要更加聚焦。目前不接受回答。 想要改进…

在TensorFlow中,queue.dequeue_up_to()方法的用途是什么?

我对这个方法感到非常困惑,特别是当我发现这个令人费解的…

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注