如何在Spark SQL中使用group by后添加稀疏向量?

我正在开发一个新闻推荐系统,需要为用户及其阅读的新闻构建一张表。我的原始数据如下所示:

001436800277225 ["9161492","9161787","9378531"]009092130698762 ["9394697"]010003000431538 ["9394697","9426473","9428530"]010156461231357 ["9350394","9414181"]010216216021063 ["9173862","9247870"]010720006581483 ["9018786"]011199797794333 ["9017977","9091134","9142852","9325464","9331913"]011337201765123 ["9161294","9198693"]011414545455156 ["9168185","9178348","9182782","9359776"]011425002581540 ["9083446","9161294","9309432"]

我使用Spark SQL进行展开和独热编码处理,

df = getdf()df1 = df.select('uuid',explode('news').alias('news'))stringIndexer = StringIndexer(inputCol="news", outputCol="newsIndex")model = stringIndexer.fit(df1)indexed = model.transform(df1)encoder = OneHotEncoder(inputCol="newsIndex", outputCol="newsVec")encoded = encoder.transform(indexed)encoded.show(20,False)

处理后,我的数据变成了这样:

+---------------+-------+---------+----------------------+|uuid           |news   |newsIndex|newsVec               |+---------------+-------+---------+----------------------+|014324000386050|9398253|10415.0  |(105721,[10415],[1.0])||014324000386050|9428530|70.0     |(105721,[70],[1.0])   ||014324000631752|654112 |1717.0   |(105721,[1717],[1.0]) ||014324000674240|730531 |2282.0   |(105721,[2282],[1.0]) ||014324000674240|694306 |1268.0   |(105721,[1268],[1.0]) ||014324000674240|712016 |4766.0   |(105721,[4766],[1.0]) ||014324000674240|672307 |7318.0   |(105721,[7318],[1.0]) ||014324000674240|698073 |1241.0   |(105721,[1241],[1.0]) ||014324000674240|728044 |5302.0   |(105721,[5302],[1.0]) ||014324000674240|672256 |1619.0   |(105721,[1619],[1.0]) ||014324000674240|730236 |2376.0   |(105721,[2376],[1.0]) ||014324000674240|730235 |14274.0  |(105721,[14274],[1.0])||014324000674240|728509 |1743.0   |(105721,[1743],[1.0]) ||014324000674240|704528 |10310.0  |(105721,[10310],[1.0])||014324000715399|774134 |8876.0   |(105721,[8876],[1.0]) ||014324000725836|9357431|3479.0   |(105721,[3479],[1.0]) ||014324000725836|9358028|15621.0  |(105721,[15621],[1.0])||014324000730349|812106 |4599.0   |(105721,[4599],[1.0]) ||014324000730349|699237 |754.0    |(105721,[754],[1.0])  ||014324000730349|748109 |4854.0   |(105721,[4854],[1.0]) |+---------------+-------+---------+----------------------+

但是每个ID有多个行,所以我想使用groupBy('uuid')然后add这些向量。但仅仅使用groupBy然后加会出错。我该怎么做呢?


回答:

indexed开始,我们可以将newsIndex列收集为一个列表,并使用udf将其转换为SparseVector

要声明一个稀疏向量,我们需要特征的数量和包含位置和值的元组列表。因为我们处理的是分类变量,所以值我们将使用1.0。而索引将是newsIndex列:

from pyspark.sql.functions import collect_list, max, litfrom pyspark.ml.linalg import Vectors, VectorUDTdef encode(arr, length):  vec_args =  length, [(x,1.0) for x in arr]  return Vectors.sparse(*vec_args)   encode_udf = udf(encode, VectorUDT())

特征的数量是max(newsIndex) + 1(因为StringIndexer0.0开始):

feats = indexed.agg(max(indexed["newsIndex"])).take(1)[0][0] + 1

将所有内容整合在一起:

indexed.groupBy("uuid") \       .agg(collect_list("newsIndex")       .alias("newsArr")) \       .select("uuid",                encode_udf("newsArr", lit(feats))               .alias("OHE")) \       .show(truncate = False)+---------------+-----------------------------------------+|uuid           |OHE                                      |+---------------+-----------------------------------------+|009092130698762|(24,[0],[1.0])                           ||010003000431538|(24,[0,3,15],[1.0,1.0,1.0])              ||010720006581483|(24,[11],[1.0])                          ||010216216021063|(24,[10,22],[1.0,1.0])                   ||001436800277225|(24,[2,12,23],[1.0,1.0,1.0])             ||011425002581540|(24,[1,5,9],[1.0,1.0,1.0])               ||010156461231357|(24,[13,18],[1.0,1.0])                   ||011199797794333|(24,[7,8,17,19,20],[1.0,1.0,1.0,1.0,1.0])||011414545455156|(24,[4,6,14,21],[1.0,1.0,1.0,1.0])       ||011337201765123|(24,[1,16],[1.0,1.0])                    |+---------------+-----------------------------------------+

Related Posts

L1-L2正则化的不同系数

我想对网络的权重同时应用L1和L2正则化。然而,我找不…

使用scikit-learn的无监督方法将列表分类成不同组别,有没有办法?

我有一系列实例,每个实例都有一份列表,代表它所遵循的不…

f1_score metric in lightgbm

我想使用自定义指标f1_score来训练一个lgb模型…

通过相关系数矩阵进行特征选择

我在测试不同的算法时,如逻辑回归、高斯朴素贝叶斯、随机…

可以将机器学习库用于流式输入和输出吗?

已关闭。此问题需要更加聚焦。目前不接受回答。 想要改进…

在TensorFlow中,queue.dequeue_up_to()方法的用途是什么?

我对这个方法感到非常困惑,特别是当我发现这个令人费解的…

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注