我正在编写一个机器学习程序,包括以下组件:
-
一个共享的“经验池”,使用类似二叉树的数据结构。
-
N个模拟器进程。每个进程会不时地向池中添加一个“经验对象”。池负责平衡其树结构。
-
M个学习器进程,这些进程每隔一段时间从池中抽取一批“经验对象”并执行相应的学习过程。
我不知道实现上述内容的最佳方法。我没有使用Tensorflow,因此无法利用其并行能力。更具体地说,
- 我首先想到的是Python3内置的
multiprocessing
库。然而,与multithreading
不同的是,multiprocessing
模块无法让不同的进程更新同一个全局对象。我的直觉是应该使用服务器代理模型。有人能给我一个大致的框架代码作为起点吗? - MPI4py是否是一个更好的解决方案?
- 还有其他更适合的库吗?我已经查看了
celery
、disque
等。对于我的用例,如何适应它们并不明显。
回答:
根据评论,你真正寻找的是一种从一组执行CPU密集型任务的进程中更新共享对象的方法。CPU密集型任务使得multiprocessing
成为一个明显的选择——如果你的大部分工作是IO密集型的,multithreading
会是一个更简单的选择。
你的问题遵循一个简单的服务器-客户端模型:客户端使用服务器作为一个简单的有状态存储,不需要任何子进程之间的通信,也不需要进程同步。
因此,最简单的实现方法是:
- 启动一个包含服务器的独立进程。
- 在服务器逻辑中,提供方法来更新和读取单个对象。
- 将你的模拟器和学习器进程视为可以定期读取和更新全局状态的独立客户端。
从服务器的角度来看,客户端的身份并不重要——只有它们的动作才重要。
因此,可以通过在multiprocessing
中使用自定义管理器来实现,如下所示:
# server.pyfrom multiprocessing.managers import BaseManager# this represents the data structure you've already implemented.from ... import ExperienceTree# An important note: the way proxy objects work is by shared weak reference to# the object. If all of your workers die, it takes your proxy object with# it. Thus, if you have an instance, the instance is garbage-collected# once all references to it have been erased. I have chosen to sidestep # this in my code by using class variables and objects so that instances# are never used - you may define __init__, etc. if you so wish, but# just be aware of what will happen to your object once all workers are gone.class ExperiencePool(object): tree = ExperienceTree() @classmethod def update(cls, experience_object): ''' Implement methods to update the tree with an experience object. ''' cls.tree.update(experience_object) @classmethod def sample(cls): ''' Implement methods to sample the tree's experience objects. ''' return cls.tree.sample()# subclass base managerclass Server(BaseManager): pass# register the class you just created - now you can access an instance of # ExperiencePool using Server.Shared_Experience_Pool().Server.register('Shared_Experience_Pool', ExperiencePool)if __name__ == '__main__': # run the server on port 8080 of your own machine with Server(('localhost', 8080), authkey=b'none') as server_process: server_process.get_server().serve_forever()
现在,对于所有你的客户端,你可以这样做:
# client.py - you can always have a separate client file for a learner and a simulator.from multiprocessing.managers import BaseManagerfrom server import ExperiencePoolclass Server(BaseManager): passServer.register('Shared_Experience_Pool', ExperiencePool)if __name__ == '__main__': # run the server on port 8080 of your own machine forever. server_process = Server(('localhost', 8080), authkey=b'none') server_process.connect() experience_pool = server_process.Shared_Experience_Pool() # now do your own thing and call `experience_call.sample()` or `update` whenever you want.
然后你可以启动一个server.py
和任意数量的workers
。
这是最佳设计吗?
并非总是如此。你可能会遇到竞争条件,即如果学习器被迫与同时写入的模拟器节点竞争,它们可能会接收到陈旧或旧的数据。
如果你想确保优先考虑最新的写入,你可以额外使用锁,在你的模拟器尝试写入时使用,防止其他进程在写入完成之前进行读取。