我正在尝试创建一个Web应用程序,该应用程序接收来自POST请求的输入,并根据该输入提供一些机器学习预测结果。
由于预测模型相当复杂,我不希望用户等待计算完成。相反,我将繁重的计算委托给Celery任务,用户稍后可以检查结果。
我使用的是带有Celery、Redis和Flower的简单Flask应用程序。
我的view.py
文件如下:
@ns.route('predict/')class Predict(Resource): ... def post(self): ... do_categorize(data) return jsonify(success=True)
我的tasks.py
文件大致如下:
from ai.categorizer import Categorizercategorizer = Categorizer( model_path='category_model.h5', tokenizer_path='tokenize.joblib', labels_path='labels.joblib')@task()def do_categorize(data): result = categorizer.predict(data) print(result) # 将结果写入数据库 ...
我的Categorizer
类中的predict()
方法如下:
def predict(self, value): K.set_session(self.sess) with self.sess.as_default(): with self.graph.as_default(): prediction = self.model.predict(np.asarray([value], dtype='int64')) return prediction
我这样运行Celery:
celery worker -A app.celery --loglevel=DEBUG
过去几天我遇到的问题是categorizer.predict(data)
调用在执行过程中挂起。
我尝试在post方法中运行categorizer.predict(data)
,它可以正常工作。但如果我将其放入Celery任务中,它就停止工作了。没有控制台日志,如果我尝试调试,它会在.predict()
处冻结。
我的问题是:
- 如何解决这个问题?
- 工作进程是否有内存或CPU限制?
- Celery任务是执行这种繁重计算的“正确”方式吗?
- 如何调试这个问题?我做错了什么?
- 在文件顶部初始化模型是否正确?
回答:
多亏了这个Stack Overflow问题,我找到了解决我问题的答案:
事实证明,对于Keras来说,使用Threads
池而不是默认的Process
更好。
幸运的是,Celery 4.4重新引入了Threads
池不久之前。你可以在Celery 4.4变更日志中阅读更多信息:
线程任务池
我们重新引入了使用concurrent.futures.ThreadPoolExecutor的线程任务池。
之前的线程任务池是实验性的。此外,它基于已经过时的threadpool包。
你可以通过将worker_pool设置为‘threads’或在celery worker命令中传递–pool threads来使用新的线程任务池。
现在你可以使用线程而不是进程进行池化操作。
celery worker -A your_application --pool threads --loginfo=INFO
如果你不能使用最新版本的Celery,你可以使用gevent
包:
pip install geventcelery worker -A your_application --pool gevent --loginfo=INFO