在PySpark中对groupby操作中的列进行稀疏向量聚合

问题:我试图将稀疏向量按id合并成一个(这应该是按id分组行后的聚合结果)。

我正在操作的原始DataFrame(以及我应用了转换方法的DataFrame)看起来像这样:

输入

+---+-------+--------+--------+| id|   col1|    col2|    col3|+---+-------+--------+--------+|  1|  [Red]|  [John]|  [Male]||  1| [Blue]| [Alice]|[Female]||  1|[Green]|[Celine]|  [Male]||  2|  [Red]|   [Bob]|  [Male]||  1|  [Red]|  [John]|  [Male]||  2|[Green]| [Alice]|[Female]|+---+-------+--------+--------+

到目前为止,我已经完成了两个转换:

在第一步中,我使用了CountVectorizer来获取每行每列的特征向量,输出如下:

+---+-------------+-------------+-------------+|id |vectors1     |vectors2     |vectors3     |+---+-------------+-------------+-------------+|1  |(3,[0],[1.0])|(4,[1],[1.0])|(2,[0],[1.0])||1  |(3,[2],[1.0])|(4,[0],[1.0])|(2,[1],[1.0])||1  |(3,[1],[1.0])|(4,[2],[1.0])|(2,[0],[1.0])||2  |(3,[0],[1.0])|(4,[3],[1.0])|(2,[0],[1.0])||1  |(3,[0],[1.0])|(4,[1],[1.0])|(2,[0],[1.0])||2  |(3,[1],[1.0])|(4,[0],[1.0])|(2,[1],[1.0])|+---+-------------+-------------+-------------+

在第二步中(基于前一步的输出),我使用了VectorAssembler将所有这些列组装成一个名为features的列:

+---+-------------------------+|id |features                 |+---+-------------------------+|1  |(9,[0,4,7],[1.0,1.0,1.0])||1  |(9,[2,3,8],[1.0,1.0,1.0])||1  |(9,[1,5,7],[1.0,1.0,1.0])||2  |(9,[0,6,7],[1.0,1.0,1.0])||1  |(9,[0,4,7],[1.0,1.0,1.0])||2  |(9,[1,3,8],[1.0,1.0,1.0])|+---+-------------------------+

期望的解决方案/输出/DataFrame:我试图实现的是在id上执行groupby操作,然后使用某个特定的聚合函数,将第二个输出(或者可能是某种方式的第一个输出)转换成如下形式:

+--+-------------------------------------------------------+|id|features                                               |+--+-------------------------------------------------------+|1 |(9,[0,1,2,3,4,5,7,8],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])||2 |            (9,[0,1,3,6,7,8],[1.0,1.0,1.0,1.0,1.0,1.0])|+--+-------------------------------------------------------+

我可能错了,但我最可能寻找的是能够对所有稀疏向量求和,并仅合并它们的内部数组的某种方法…

*另外,我也在寻找一个可选的解决方案,即在聚合函数中能够假设数组中特定特征出现的次数,因此替代性地,features列可以转换为如下形式:

+--+-------------------------------------------------------+|id|features                                               |+--+-------------------------------------------------------+|1 |(9,[0,1,2,3,4,5,7,8],[2.0,1.0,1.0,1.0,2.0,1.0,3.0,1.0])| # 0: 2次, 4: 2次, 7: 3次|2 |            (9,[0,1,3,6,7,8],[1.0,1.0,1.0,1.0,1.0,1.0])|+--+-------------------------------------------------------+

回答:

我遇到了一个非常相似的问题,刚刚找到了一个非常糟糕的解决方案,使用了UDF。

从您提供的样本开始:

from pyspark.ml.linalg import SparseVector, DenseVectorimport pyspark.sql.functions as Fdf = sc.parallelize([    (1, SparseVector(9,[0,4,7],[1.0,1.0,1.0])),    (1, SparseVector(9,[2,3,8],[1.0,1.0,1.0])),    (1, SparseVector(9,[1,5,7],[1.0,1.0,1.0])),    (2, SparseVector(9,[0,6,7],[1.0,1.0,1.0])),    (1, SparseVector(9,[0,4,7],[1.0,1.0,1.0])),    (2, SparseVector(9,[1,3,8],[1.0,1.0,1.0])),]).toDF(["id", "features"])

我创建了这个UDF来添加向量:

from pyspark.ml.linalg import Vectors, VectorUDTimport numpy as np@F.udf(returnType=VectorUDT())def elementwise_sum(vectors):    res = None    for vec in vectors:        if res is None:            res = vec        else:            res = np.add(vec,res)    return SparseVector(len(res),{k: v for k,v in enumerate(res) if v != 0})

有了这个,您将能够聚合向量并返回一个结果向量

df = df.groupBy('id').agg(elementwise_sum(F.collect_list('features')).alias('features'))df.show(10,False)+---+-------------------------------------------------------+|id |features_raw                                           |+---+-------------------------------------------------------+|1  |(9,[0,1,2,3,4,5,7,8],[2.0,1.0,1.0,1.0,2.0,1.0,3.0,1.0])||2  |(9,[0,1,3,6,7,8],[1.0,1.0,1.0,1.0,1.0,1.0])            |+---+-------------------------------------------------------+

Related Posts

使用LSTM在Python中预测未来值

这段代码可以预测指定股票的当前日期之前的值,但不能预测…

如何在gensim的word2vec模型中查找双词组的相似性

我有一个word2vec模型,假设我使用的是googl…

dask_xgboost.predict 可以工作但无法显示 – 数据必须是一维的

我试图使用 XGBoost 创建模型。 看起来我成功地…

ML Tuning – Cross Validation in Spark

我在https://spark.apache.org/…

如何在React JS中使用fetch从REST API获取预测

我正在开发一个应用程序,其中Flask REST AP…

如何分析ML.NET中多类分类预测得分数组?

我在ML.NET中创建了一个多类分类项目。该项目可以对…

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注