问题:我试图将稀疏向量按id合并成一个(这应该是按id分组行后的聚合结果)。
我正在操作的原始DataFrame(以及我应用了转换方法的DataFrame)看起来像这样:
输入:
+---+-------+--------+--------+| id| col1| col2| col3|+---+-------+--------+--------+| 1| [Red]| [John]| [Male]|| 1| [Blue]| [Alice]|[Female]|| 1|[Green]|[Celine]| [Male]|| 2| [Red]| [Bob]| [Male]|| 1| [Red]| [John]| [Male]|| 2|[Green]| [Alice]|[Female]|+---+-------+--------+--------+
到目前为止,我已经完成了两个转换:
在第一步中,我使用了CountVectorizer
来获取每行每列的特征向量,输出如下:
+---+-------------+-------------+-------------+|id |vectors1 |vectors2 |vectors3 |+---+-------------+-------------+-------------+|1 |(3,[0],[1.0])|(4,[1],[1.0])|(2,[0],[1.0])||1 |(3,[2],[1.0])|(4,[0],[1.0])|(2,[1],[1.0])||1 |(3,[1],[1.0])|(4,[2],[1.0])|(2,[0],[1.0])||2 |(3,[0],[1.0])|(4,[3],[1.0])|(2,[0],[1.0])||1 |(3,[0],[1.0])|(4,[1],[1.0])|(2,[0],[1.0])||2 |(3,[1],[1.0])|(4,[0],[1.0])|(2,[1],[1.0])|+---+-------------+-------------+-------------+
在第二步中(基于前一步的输出),我使用了VectorAssembler
将所有这些列组装成一个名为features的列:
+---+-------------------------+|id |features |+---+-------------------------+|1 |(9,[0,4,7],[1.0,1.0,1.0])||1 |(9,[2,3,8],[1.0,1.0,1.0])||1 |(9,[1,5,7],[1.0,1.0,1.0])||2 |(9,[0,6,7],[1.0,1.0,1.0])||1 |(9,[0,4,7],[1.0,1.0,1.0])||2 |(9,[1,3,8],[1.0,1.0,1.0])|+---+-------------------------+
期望的解决方案/输出/DataFrame:我试图实现的是在id上执行groupby操作,然后使用某个特定的聚合函数,将第二个输出(或者可能是某种方式的第一个输出)转换成如下形式:
+--+-------------------------------------------------------+|id|features |+--+-------------------------------------------------------+|1 |(9,[0,1,2,3,4,5,7,8],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])||2 | (9,[0,1,3,6,7,8],[1.0,1.0,1.0,1.0,1.0,1.0])|+--+-------------------------------------------------------+
我可能错了,但我最可能寻找的是能够对所有稀疏向量求和,并仅合并它们的内部数组的某种方法…
*另外,我也在寻找一个可选的解决方案,即在聚合函数中能够假设数组中特定特征出现的次数,因此替代性地,features列可以转换为如下形式:
+--+-------------------------------------------------------+|id|features |+--+-------------------------------------------------------+|1 |(9,[0,1,2,3,4,5,7,8],[2.0,1.0,1.0,1.0,2.0,1.0,3.0,1.0])| # 0: 2次, 4: 2次, 7: 3次|2 | (9,[0,1,3,6,7,8],[1.0,1.0,1.0,1.0,1.0,1.0])|+--+-------------------------------------------------------+
回答:
我遇到了一个非常相似的问题,刚刚找到了一个非常糟糕的解决方案,使用了UDF。
从您提供的样本开始:
from pyspark.ml.linalg import SparseVector, DenseVectorimport pyspark.sql.functions as Fdf = sc.parallelize([ (1, SparseVector(9,[0,4,7],[1.0,1.0,1.0])), (1, SparseVector(9,[2,3,8],[1.0,1.0,1.0])), (1, SparseVector(9,[1,5,7],[1.0,1.0,1.0])), (2, SparseVector(9,[0,6,7],[1.0,1.0,1.0])), (1, SparseVector(9,[0,4,7],[1.0,1.0,1.0])), (2, SparseVector(9,[1,3,8],[1.0,1.0,1.0])),]).toDF(["id", "features"])
我创建了这个UDF来添加向量:
from pyspark.ml.linalg import Vectors, VectorUDTimport numpy as np@F.udf(returnType=VectorUDT())def elementwise_sum(vectors): res = None for vec in vectors: if res is None: res = vec else: res = np.add(vec,res) return SparseVector(len(res),{k: v for k,v in enumerate(res) if v != 0})
有了这个,您将能够聚合向量并返回一个结果向量
df = df.groupBy('id').agg(elementwise_sum(F.collect_list('features')).alias('features'))df.show(10,False)+---+-------------------------------------------------------+|id |features_raw |+---+-------------------------------------------------------+|1 |(9,[0,1,2,3,4,5,7,8],[2.0,1.0,1.0,1.0,2.0,1.0,3.0,1.0])||2 |(9,[0,1,3,6,7,8],[1.0,1.0,1.0,1.0,1.0,1.0]) |+---+-------------------------------------------------------+