我目前正在尝试理解 Spark 计算过程及其对内存消耗的影响。
我使用的是 Spark 2.3.2 和 Python 2.7 在 Zeppelin 环境下工作。
基本上,在下面的循环中,我创建了两个集合。我使用 sci-kit-learn 构建了一个机器学习模型,并在 sci-kit-learn 计算之后对 pyspark-dataframes 进行了大量的数据框操作。对于每个 i,我得到了一个 rsmeMaeStep 表,包含 8 行和 10 列,里面是小字符串或双精度值。rsmeMaeAll 只是将单个分析结果累加起来,对于 i=26 时,它有 8*26=208 行和 10 列。
for i in range(26): df_features_train, df_features_validation = randomizer(dataFiltered) rsmeMaeStep, rsmeMaeAll = rsmeMaeAnalysis(rsmeMaeAll,df_features_train,df_features_test) print(i)
我对代码进行了一些时间分析。对于 i=1 时,耗时 17 秒;i=10 时,耗时 2 分 40 秒;i=26 时,耗时 6 分 42 秒。(即 10 次或 26 次循环耗时分别增加了 9.4 倍或 23.6 倍。)到目前为止,一切都在预期之中。下一步我遇到了问题。以下代码应该只是对 8 到 206 行的简单聚合操作。对于 i=1 时,耗时 32 秒;i=7 时,耗时 4 分 43 秒(增加了 8.8 倍);但对于 i=26 时,47 分钟后仍显示 0% 或因内存溢出而失败。
rsmeMae = rsmeMaeAll.select('set','setting','sme').orderBy('set','setting')import pyspark.sql.functions as frsmeMaeAverage = rsmeMae.groupBy('setting','set').agg(f.count(('setting')).alias('nrOfRand'), f.round(f.mean('sme'),2).alias('rsme'),f.round(f.stddev('sme'),2).alias('sigmaRsme')).orderBy('set','setting')z.show(rsmeMaeAverage)
根据逻辑,我认为在每次循环中所有表都应该被覆盖。只有小的 rsmeMaeAll 表会随着每次循环略有增加。但它仍然是一个很小的表。
但 Spark 的行为可能与此不同。
根据我的理解,第一步的 sk-learn 代码在第一步中执行。如果我正确理解 Spark 的惰性评估,我的代码中的 pySpark 操作会在我想打印结果时开始执行。因此,Spark 可能会将所有循环的表都保存在内存中。这是正确的吗?
如果我是对的,我需要在每次循环结束时直接计算 pySpark 代码。
我该如何做到这一点?
如果我这样做了,Spark 在下一个循环中会覆盖这些表,还是内存消耗仍然会随着每次循环增加?我需要主动从内存中删除这些表吗?如何操作?
编辑: 我刚刚将以下代码整合进了循环,以确保 pyspark 计算立即执行。但虽然第一个循环耗时 55 秒,第七个循环耗时超过 10 分钟,并且在第八个循环的 rsmeMaeAll.collect() 后 49 分钟后崩溃。错误信息如下:
Py4JJavaError: An error occurred while calling o13488.collectToPython. :java.lang.OutOfMemoryError: Java heap space
我真的不理解每次循环时间的指数增长。而且之前我至少能运行 10 个循环。那里发生了什么?
回答:
我认为问题与 Spark 的惰性评估有关。因为我收集了所有信息,pyspark 数据框 rsmeMaeAll 可能在尝试计算输出时,所有生成 rsmeMaeAll 所需的信息都被同时加载到了缓存中。
基于这个想法,我重构了代码,使 Spark 不需要再保留中间步骤。此外,我整合了时间测量,并重构了旧代码,分为两种变体,使一种变体更接近新逻辑,并确保每种变体的计算在每次循环结束时都必须完成。
解决方案如下:
for i in range(9): ti0 = time.time() df_features_train, df_features_test = randomizer(dataFiltered) rsmeMaeStep = rsmeMaeAnalysis(df_features_train,df_features_test) rsmeMaeAllpd = rsmeMaeAllpd.append(rsmeMaeStep.toPandas()) print(rsmeMaeAllpd) ti1 = time.time() print "Time for loop", i, ":", ti1-ti0
在 rsmeMaeAnalysis 中,我只是计算了分析的结果,返回它们,将它们转换为 Pandas 数据框,并在 Pandas 中收集所有结果。结果是,每次循环所需的时间大致相同,即使在 20 个循环后也没有出现内存问题。前十个循环的时间如下:
41s ,42s ,44s ,40s ,43s ,43s ,40s ,39s ,40s ,40s
但随后我想确保在 pyspark 数据框中收集结果确实是问题所在,因此我构建了一个尽可能接近 pandas 解决方案的代码,但结果收集在 pyspark 数据框中:
for i in range(10): ti0 = time.time() df_features_train, df_features_test = randomizer(dataFiltered) rsmeMaeStep = rsmeMaeAnalysis(df_features_train,df_features_test) rsmeMaeAll = rsmeMaeAll.union(rsmeMaeStep) rsmeMaeAll.show(80,False) ti1 = time.time() print "Time for loop", i, ":", ti1-ti0
前八个循环的时间如下:
43s ,63s ,88s ,144s ,162s ,175s ,212s ,276s
在原始变体中,仅加入时间测量,直到第 7 个循环后出现内存溢出错误,时间如下:
44s ,60s ,73s ,98s ,128s ,157s ,198s
最终似乎是惰性评估导致了大量生成 rsmeMaeAll 所需的信息在每次循环结束时都被同时加载到了缓存中,尽管这些信息中的大部分在循环结束时并不相关。