我知道关于如何通过加盐键等方法最佳分区你的DataFrames
或RDD
的问题有很多,但我认为这种情况足够不同,值得提出一个独立的问题。
我正在用PySpark构建一个协同过滤推荐引擎,这意味着每个用户(行)的独特项目评分需要进行比较。因此,对于一个尺寸为M (行) x N (列)
的DataFrame
,这意味着数据集将变为M x (K choose 2)
,其中K << N
是用户的非空(即已评分)元素的数量。
我的算法在用户评分的项目数量大致均匀的数据集上运行得相当好且高效。然而,对于某些用户评分了大量项目(比同一分区中的其他用户大几个数量级)的场景,我的数据变得极度倾斜,最后几个分区开始花费大量时间。考虑一个简单的例子,以下是DataFrame
:
cols = ['id', 'Toy Story', 'UP', 'Die Hard', 'MIB', 'The Shining']
ratings = [
(1, 4.5, 3.5, None, 1.0, None), # 用户1
(2, 2.0, None, 5.0, 4.0, 3.0), # 用户2
(3, 3.5, 5.0, 1.0, None, 1.0), # 用户3
(4, None, None, 4.5, 3.5, 4.0), # 用户4
(5, None, None, None, None, 4.5) # 用户5
]
sc.parallelize(ratings, 2).toDF(cols)
我的情况出现在这个DataFrame
的更大变体中(大约100万用户和1万个项目),其中一些用户评分的电影比例远高于其他人。最初,我对DataFrame
进行了如下稀疏处理:
def _make_ratings(row):
import numpy as np
non_null_mask = ~np.isnan(row)
idcs = np.where(non_null_mask)[0] # 提取非空索引掩码
# 将非空索引与相应的评分进行组合
rtgs = row[non_null_mask]
return list(zip(idcs, rtgs))
def as_array(partition):
import numpy as np
for row in partition:
yield _make_ratings(np.asarray(row, dtype=np.float32))
# 删除id列,获取RDD,并创建np.ndarrays的副本
ratings = R.drop('id').rdd\
.mapPartitions(as_array)\
.cache()
然后,我可以按以下方式检查每个分区所需的互助评分对的数量:
n_choose_2 = (lambda itrbl: (len(itrbl) * (len(itrbl) - 1)) / 2.)
sorted(ratings.map(n_choose_2).glom().map(sum).collect(), reverse=True)
最初,这是我得到的每个分区的互助评分对的分布:
如你所见,这显然无法扩展。因此,我尝试通过更智能的方式在源头分区我的数据框架。我想出了以下函数,它将随机分区我的数据框架行:
def shuffle_partition(X, n_partitions, col_name='shuffle'):
from pyspark.sql.functions import rand
X2 = X.withColumn(col_name, rand())
return X2.repartition(n_partitions, col_name).drop(col_name)
这在某种程度上有效。应用后,这是新的分布:
这显然扩展得更好,但仍然不符合我的期望。一定有办法更均匀地在分区中分配这些“高评分用户”,但我就是想不出来。我一直在考虑按“每个用户的评分数量”列进行分区,但这最终会将所有高评分用户集中在一起,而不是将他们分开。
我是不是忽略了什么显而易见的东西?
更新
我实现了igrinis的解决方案在以下函数中(我相信有更优雅的方式来编写这个,但我对DataFrame
API不太熟悉,所以我回到了RDD
来做这个——欢迎批评),但分布大致与原始分布相同,所以不确定我是否做错了什么…:
def partition_by_rating_density(X, id_col_name, n_partitions,
partition_col_name='partition'):
"""根据评分密度分段分区。分区将根据每个用户的评分数量更均匀地分布。
参数
----------
X : PySpark DataFrame
评分矩阵
id_col_name : str
ID列名
n_partitions : int
新DataFrame中的分区数量。
partition_col_name : str
分区列的名称
返回
-------
with_partition_key : PySpark DataFrame
分区后的DataFrame
"""
ididx = X.columns.index(id_col_name)
def count_non_null(row):
sm = sum(1 if v is not None else 0
for i, v in enumerate(row) if i != ididx)
return row[ididx], sm
# 将计数作为最后一个元素添加,id作为第一个元素
counted = X.rdd.map(count_non_null)\
.sortBy(lambda r: r[-1], ascending=False)
# 获取计数数组,zip它与索引,然后flatMap
# 它以获取排序后的索引
indexed = counted.zipWithIndex()\
.map(lambda ti: (ti[0][0], ti[1] % n_partitions))\
.toDF([id_col_name, partition_col_name])
# 与indexed重新连接,现在它有分区列
counted_indexed = X.join(indexed, on=id_col_name, how='inner')
# 要删除的列
return counted_indexed.repartition(n_partitions, partition_col_name)\
.drop(partition_col_name)
回答: