### 处理大量列时性能下降。Pyspark

我在处理宽数据框时遇到了问题(大约9000列,有时更多)。
任务:

  1. 通过groupBy和pivot创建宽数据框。
  2. 将列转换为向量,并使用pyspark.ml中的KMeans进行处理。

因此,我创建了一个大数据框,并尝试使用VectorAssembler创建向量,对其进行缓存,并在其上训练KMeans。
在我的PC上以独立模式处理大约500行x9000列的数据框时,组装过程大约需要11分钟,KMeans需要2分钟,处理7个不同数量的聚类。
相比之下,使用pandas处理(pivot数据框,并迭代7个聚类)不到一分钟。
显然我理解独立模式、缓存等带来的开销和性能下降,但这真的让我很沮丧。
有人能解释一下我如何避免这种开销吗?
人们如何处理宽数据框而不使用VectorAssembler并避免性能下降?
更正式的问题(符合stackoverflow规则)听起来像 – 我如何加速这段代码?

%%timetmp = (df_states.select('ObjectPath', 'User', 'PropertyFlagValue')       .groupBy('User')       .pivot('ObjectPath')       .agg({'PropertyFlagValue':'max'})       .fillna(0))ignore = ['User']assembler = VectorAssembler(    inputCols=[x for x in tmp.columns if x not in ignore],    outputCol='features')Wall time: 36.7 sprint(tmp.count(), len(tmp.columns))552, 9378%%timetransformed = assembler.transform(tmp).select('User', 'features').cache()Wall time: 10min 45s%%timelst_levels = []for num in range(3, 14):    kmeans = KMeans(k=num, maxIter=50)    model = kmeans.fit(transformed)    lst_levels.append(model.computeCost(transformed))rs = [i-j for i,j in list(zip(lst_levels, lst_levels[1:]))]for i, j in zip(rs, rs[1:]):    if i - j < j:        print(rs.index(i))        kmeans = KMeans(k=rs.index(i) + 3, maxIter=50)        model = kmeans.fit(transformed)        break Wall time: 1min 32s

配置:

.config("spark.sql.pivotMaxValues", "100000") \.config("spark.sql.autoBroadcastJoinThreshold", "-1") \.config("spark.sql.shuffle.partitions", "4") \.config("spark.sql.inMemoryColumnarStorage.batchSize", "1000") \

回答:

实际上,解决方案是在map中找到的,用于RDD。

  1. 首先,我们要创建一个值的映射表。
  2. 同时提取所有不同的名称。
  3. 倒数第二步,我们在名称字典中搜索每行映射表的值,并返回值或如果未找到则返回0。
  4. 对结果应用向量组装器。

优点:

  1. 您不必创建具有大量列的数据框,从而避免了开销。(速度从11分钟提高到1分钟。)
  2. 您仍然在集群上工作,并在spark的范式中执行代码。

代码示例:scala实现

Related Posts

L1-L2正则化的不同系数

我想对网络的权重同时应用L1和L2正则化。然而,我找不…

使用scikit-learn的无监督方法将列表分类成不同组别,有没有办法?

我有一系列实例,每个实例都有一份列表,代表它所遵循的不…

f1_score metric in lightgbm

我想使用自定义指标f1_score来训练一个lgb模型…

通过相关系数矩阵进行特征选择

我在测试不同的算法时,如逻辑回归、高斯朴素贝叶斯、随机…

可以将机器学习库用于流式输入和输出吗?

已关闭。此问题需要更加聚焦。目前不接受回答。 想要改进…

在TensorFlow中,queue.dequeue_up_to()方法的用途是什么?

我对这个方法感到非常困惑,特别是当我发现这个令人费解的…

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注