我在学习Python的多线程编程,并且正在尝试以下内容:当我在线程中运行某些任务(无论何时我打印输出),它们似乎从未并行执行。此外,我的函数执行时间与使用concurrent.futures库(ThreadPoolExecutor)之前相同。我需要计算数据集上某些属性的增益(我不能使用其他库)。由于我有大约1024个属性,并且函数执行需要大约一分钟(而且我需要在一个for循环中使用它),我决定将attributes
数组分割成10份(仅作为示例),并为每个子数组分别运行gain(attribute)
函数。因此,我做了以下操作(省略了一些多余的代码):
def calculate_gains(self): splited_attributes = np.array_split(self.attributes, 10) result = {} for atts in splited_attributes: with concurrent.futures.ThreadPoolExecutor() as executor: future = executor.submit(self.calculate_gains_helper, atts) return_value = future.result() self.gains = {**self.gains, **return_value}
这是calculate_gains_helper
函数:
def calculate_gains_helper(self, attributes): inter_result = {} for attribute in attributes: inter_result[attribute] = self.gain(attribute) return inter_result
我做错了什么吗?我读了一些旧的帖子,但没能找到有用的信息。非常感谢您的任何帮助!
回答:
我曾经遇到过同样的问题,通过将迭代移动到ThreadPoolExecutor的上下文中解决了这个问题,否则,您将不得不等待上下文完成并开始另一个上下文。
这里是可能修复您代码的方法:
def calculate_gains(self): splited_attributes = np.array_split(self.attributes, 10) result = {} with concurrent.futures.ThreadPoolExecutor() as executor: for atts in splited_attributes: future = executor.submit(self.calculate_gains_helper, atts) return_value = future.result() self.gains = {**self.gains, **return_value}
为了更好地说明我的意思,这里是一个示例代码:
下面是一个不工作的代码。线程将同步执行…
from concurrent.futures import ThreadPoolExecutor, as_completedfrom time import sleepdef t(reference): i = 0 for i in range(10): print(f"{reference} :" + str(i)) i+=1 sleep(1)futures = []refs = ["a", "b", "c"]for i in refs: with ThreadPoolExecutor(max_workers=3) as executor: futures.append(executor.submit(t, i)) for future in as_completed(futures): print(future.result())
这里是修复后的代码:
from concurrent.futures import ThreadPoolExecutor, as_completedfrom time import sleepdef t(reference): i = 0 for i in range(10): print(f"{reference} :" + str(i)) i+=1 sleep(1)futures = []refs = ["a", "b", "c"]with ThreadPoolExecutor(max_workers=3) as executor: #交换位置 for i in refs: #交换位置 futures.append(executor.submit(t, i)) for future in as_completed(futures): print(future.result())
您可以在终端中尝试一下,看看输出结果。