我想在使用数据框作为输入的模型上使用多进程来获取预测结果。我有以下代码:
def perform_model_predictions(model, dataFrame, cores=4): try: with Pool(processes=cores) as pool: result = pool.map(model.predict, dataFrame) return result # return model.predict(dataFrame) except AttributeError: logging.error("AttributeError occurred", exc_info=True)
我遇到的错误是:
raise TypeError("sparse matrix length is ambiguous; use getnnz()"TypeError: sparse matrix length is ambiguous; use getnnz() or shape[0]
我认为问题出在将数据框作为第二个参数传递给pool.map
函数。任何建议或帮助将不胜感激。
回答:
诀窍是将你的数据框分成几块。map
期望得到一个将被model.predict
处理的对象列表。这里有一个完整的可运行示例,模型显然是模拟的:
import numpy as npimport pandas as pdfrom multiprocessing import Poolno_cores = 4large_df = pd.concat([pd.Series(np.random.rand(1111)), pd.Series(np.random.rand(1111))], axis = 1)chunk_size = len(large_df) // no_cores + no_coreschunks = [df_chunk for g, df_chunk in large_df.groupby(np.arange(len(large_df)) // chunk_size)]class model(object): @staticmethod def predict(df): return np.random.randint(0,2)def perform_model_predictions(model, dataFrame, cores): try: with Pool(processes=cores) as pool: result = pool.map(model.predict, dataFrame) return result # return model.predict(dataFrame) except AttributeError: logging.error("AttributeError occurred", exc_info=True)perform_model_predictions(model, chunks, no_cores)
请注意,这里选择的块数与核心数相匹配(或者任何你想分配的数量)。这样每个核心都能得到公平的分配,multiprocessing
不会在对象序列化上花费太多时间。
如果你想单独处理每一行(pd.Series
),序列化所花费的时间可能会是一个问题。在这种情况下,我建议使用joblib
并阅读其各种后端的文档。我没有写关于它的内容,因为看起来你想在pd.Dataframe
上调用predict。
额外警告
可能会发生的情况是,multiprocessing
不仅没有提高性能,反而使性能变差。这种情况在相当罕见的情况下发生,当你的model.predict
调用外部模块时,这些模块本身会生成线程。我在这里写过这个问题这里。长话短说,joblib
再次可能是答案。