我试图使用 Python 的 Scoop 库对从正态分布随机生成的 10000000 个数据点(4 个特征,1 个目标变量)进行并行线性回归。这里是代码:
import pandas as pd
import numpy as np
import random
from scoop import futures
import statsmodels.api as sm
from time import time
def linreg(vals):
global model
model = sm.OLS(y_vals,X_vals).fit()
return model
print(model.summary())
if __name__ == '__main__':
random.seed(42)
vals = pd.DataFrame(np.random.normal(loc = 3, scale = 100, size =(10000000,5)))
vals.columns = ['dep', 'ind1', 'ind2', 'ind3', 'ind4']
y_vals = vals['dep']
X_vals = vals[['ind1', 'ind2', 'ind3', 'ind4']]
bt = time()
model_vals = list(map(linreg, [1,2,3]))
mval = model_vals[0]
print(mval.summary())
serial_time = time() - bt
bt1 = time()
model_vals_1 = list(futures.map(linreg, [1,2,3]))
mval_1 = model_vals_1[0]
print(mval_1.summary())
parallel_time = time() - bt1
print(serial_time, parallel_time)
然而,在通过 Python 的标准 map 函数以串行方式生成回归摘要后,出现了错误:
Traceback (most recent call last): File “C:\Users\niccolo.gentile\AppData\Local\Continuum\anaconda3\envs\tensorenviron\lib\runpy.py”, line 193, in _run_module_as_main “main”, mod_spec) File “C:\Users\niccolo.gentile\AppData\Local\Continuum\anaconda3\envs\tensorenviron\lib\runpy.py”, line 85, in _run_code exec(code, run_globals) File “C:\Users\niccolo.gentile\AppData\Local\Continuum\anaconda3\envs\tensorenviron\lib\site-packages\scoop\bootstrap__main__.py”, line 302, in b.main() File “C:\Users\niccolo.gentile\AppData\Local\Continuum\anaconda3\envs\tensorenviron\lib\site-packages\scoop\bootstrap__main__.py”, line 92, in main self.run() File “C:\Users\niccolo.gentile\AppData\Local\Continuum\anaconda3\envs\tensorenviron\lib\site-packages\scoop\bootstrap__main__.py”, line 290, in run futures_startup() File “C:\Users\niccolo.gentile\AppData\Local\Continuum\anaconda3\envs\tensorenviron\lib\site-packages\scoop\bootstrap__main__.py”, line 271, in futures_startup run_name=”main” File “C:\Users\niccolo.gentile\AppData\Local\Continuum\anaconda3\envs\tensorenviron\lib\site-packages\scoop\futures.py”, line 64, in _startup result = _controller.switch(rootFuture, *args, **kargs) File “C:\Users\niccolo.gentile\AppData\Local\Continuum\anaconda3\envs\tensorenviron\lib\site-packages\scoop_control.py”, line 253, in runController raise future.exceptionValue File “C:\Users\niccolo.gentile\AppData\Local\Continuum\anaconda3\envs\tensorenviron\lib\site-packages\scoop_control.py”, line 127, in runFuture future.resultValue = future.callable(*future.args, **future.kargs) File “C:\Users\niccolo.gentile\AppData\Local\Continuum\anaconda3\envs\tensorenviron\lib\runpy.py”, line 263, in run_path pkg_name=pkg_name, script_name=fname) File “C:\Users\niccolo.gentile\AppData\Local\Continuum\anaconda3\envs\tensorenviron\lib\runpy.py”, line 96, in _run_module_code mod_name, mod_spec, pkg_name, script_name) File “C:\Users\niccolo.gentile\AppData\Local\Continuum\anaconda3\envs\tensorenviron\lib\runpy.py”, line 85, in _run_code exec(code, run_globals) File “Scoop_map_linear_regression1.py”, line 33, in model_vals_1 = list(futures.map(linreg, [1,2,3])) File “C:\Users\niccolo.gentile\AppData\Local\Continuum\anaconda3\envs\tensorenviron\lib\site-packages\scoop\futures.py”, line 102, in _mapGenerator for future in _waitAll(*futures): File “C:\Users\niccolo.gentile\AppData\Local\Continuum\anaconda3\envs\tensorenviron\lib\site-packages\scoop\futures.py”, line 358, in _waitAll for f in _waitAny(future): File “C:\Users\niccolo.gentile\AppData\Local\Continuum\anaconda3\envs\tensorenviron\lib\site-packages\scoop\futures.py”, line 335, in _waitAny raise childFuture.exceptionValue NameError: name ‘y_vals’ is not defined
这意味着代码在 model_vals_1 = list(futures.map(linreg, [1,2,3]))
处停止了
请注意,为了能够并行运行代码,必须从命令行启动并指定 -m scoop 参数,像这样:
python -m scoop Scoop_map_linear_regression1.py
事实上,如果不带 -m scoop 参数启动,它将不会并行化,实际上会运行,但只是使用了两次内置的 Python map 函数(因此,以串行方式运行两次),正如警告中报告的那样。也就是说,如果在启动时不指定 -m scoop 参数,futures.map 将被 map 替换,而目标实际上是使用 futures.map 并行运行它。
此澄清是为了避免人们回答说他们通过简单地不带 -m scoop 参数启动代码来解决问题,就像在这里已经发生过的那样:
因此,由于这个原因,该问题被错误地搁置为非主题,因为不再可重现。
非常感谢,任何评论都非常受欢迎和欢迎。
回答:
解决方案是将 [1]
作为 futures.map 的第二个参数传递(但不一定是 map 的第二个参数)。
事实上,尽管 linreg 函数不使用传递给 map 的第二个参数,但它仍然决定了 linreg 函数将运行多少次。作为一个例子,考虑以下基本示例:
def welcome(x):
print('Hello world!')
if __name__ == '__main__':
a = list(map(welcome, [1,2]))
welcome 函数实际上不需要任何参数,但输出仍然会是
Hello world!
Hello world!
重复两次,也就是传递给第二个参数的列表的长度。
在这种特定情况下,这意味着线性回归将由 map 运行 3 次,尽管回归输出只会出现一次,因为摘要是在 map 之外调用的。
关键是,相反,不可能使用 futures.map 多次运行线性回归。问题是,显然,在第一次运行后,它实际上删除了使用的数据集,因此无法继续进行第二次和第三次运行,并因此在 Trace 末尾抛出
NameError: name ‘y_vals’ is not defined
通过导航到:scoop.futures 源代码 应该可以看到这一点
没有浏览所有内容,但我猜问题应该与 greenlet 切换器有关。