### 通过行中非空元素数量均匀分区PySpark Dataframe

我知道关于如何通过加盐键等方法最佳分区你的DataFramesRDD的问题有很多,但我认为这种情况足够不同,值得提出一个独立的问题。

我正在用PySpark构建一个协同过滤推荐引擎,这意味着每个用户(行)的独特项目评分需要进行比较。因此,对于一个尺寸为M (行) x N (列)DataFrame,这意味着数据集将变为M x (K choose 2),其中K << N是用户的非空(即已评分)元素的数量。

我的算法在用户评分的项目数量大致均匀的数据集上运行得相当好且高效。然而,对于某些用户评分了大量项目(比同一分区中的其他用户大几个数量级)的场景,我的数据变得极度倾斜,最后几个分区开始花费大量时间。考虑一个简单的例子,以下是DataFrame

cols = ['id', 'Toy Story', 'UP', 'Die Hard', 'MIB', 'The Shining']
ratings = [
    (1, 4.5,  3.5,  None, 1.0,  None),  # 用户1
    (2, 2.0,  None, 5.0,  4.0,  3.0),   # 用户2
    (3, 3.5,  5.0,  1.0,  None, 1.0),   # 用户3
    (4, None, None, 4.5,  3.5,  4.0),   # 用户4
    (5, None, None, None, None, 4.5)    # 用户5
]
sc.parallelize(ratings, 2).toDF(cols)

我的情况出现在这个DataFrame的更大变体中(大约100万用户和1万个项目),其中一些用户评分的电影比例远高于其他人。最初,我对DataFrame进行了如下稀疏处理:

def _make_ratings(row):
    import numpy as np
    non_null_mask = ~np.isnan(row)
    idcs = np.where(non_null_mask)[0]  # 提取非空索引掩码
    # 将非空索引与相应的评分进行组合
    rtgs = row[non_null_mask]
    return list(zip(idcs, rtgs))
def as_array(partition):
    import numpy as np
    for row in partition:
        yield _make_ratings(np.asarray(row, dtype=np.float32))
# 删除id列,获取RDD,并创建np.ndarrays的副本
ratings = R.drop('id').rdd\
           .mapPartitions(as_array)\
           .cache()

然后,我可以按以下方式检查每个分区所需的互助评分对的数量:

n_choose_2 = (lambda itrbl: (len(itrbl) * (len(itrbl) - 1)) / 2.)
sorted(ratings.map(n_choose_2).glom().map(sum).collect(), reverse=True)

最初,这是我得到的每个分区的互助评分对的分布:

第一次分布

如你所见,这显然无法扩展。因此,我尝试通过更智能的方式在源头分区我的数据框架。我想出了以下函数,它将随机分区我的数据框架行:

def shuffle_partition(X, n_partitions, col_name='shuffle'):
    from pyspark.sql.functions import rand
    X2 = X.withColumn(col_name, rand())
    return X2.repartition(n_partitions, col_name).drop(col_name)

这在某种程度上有效。应用后,这是新的分布:

第二次分布

这显然扩展得更好,但仍然不符合我的期望。一定有办法更均匀地在分区中分配这些“高评分用户”,但我就是想不出来。我一直在考虑按“每个用户的评分数量”列进行分区,但这最终会将所有高评分用户集中在一起,而不是将他们分开。

我是不是忽略了什么显而易见的东西?

更新

我实现了igrinis的解决方案在以下函数中(我相信有更优雅的方式来编写这个,但我对DataFrameAPI不太熟悉,所以我回到了RDD来做这个——欢迎批评),但分布大致与原始分布相同,所以不确定我是否做错了什么…:

def partition_by_rating_density(X, id_col_name, n_partitions,
                                partition_col_name='partition'):
    """根据评分密度分段分区。分区将根据每个用户的评分数量更均匀地分布。
    参数
    ----------
    X : PySpark DataFrame
        评分矩阵
    id_col_name : str
        ID列名
    n_partitions : int
        新DataFrame中的分区数量。
    partition_col_name : str
        分区列的名称
    返回
    -------
    with_partition_key : PySpark DataFrame
        分区后的DataFrame
    """
    ididx = X.columns.index(id_col_name)
    def count_non_null(row):
        sm = sum(1 if v is not None else 0
                 for i, v in enumerate(row) if i != ididx)
        return row[ididx], sm
    # 将计数作为最后一个元素添加,id作为第一个元素
    counted = X.rdd.map(count_non_null)\
               .sortBy(lambda r: r[-1], ascending=False)
    # 获取计数数组,zip它与索引,然后flatMap
    # 它以获取排序后的索引
    indexed = counted.zipWithIndex()\
                     .map(lambda ti: (ti[0][0], ti[1] % n_partitions))\
                     .toDF([id_col_name, partition_col_name])
    # 与indexed重新连接,现在它有分区列
    counted_indexed = X.join(indexed, on=id_col_name, how='inner')
    # 要删除的列
    return counted_indexed.repartition(n_partitions, partition_col_name)\
        .drop(partition_col_name)

回答:

Related Posts

L1-L2正则化的不同系数

我想对网络的权重同时应用L1和L2正则化。然而,我找不…

使用scikit-learn的无监督方法将列表分类成不同组别,有没有办法?

我有一系列实例,每个实例都有一份列表,代表它所遵循的不…

f1_score metric in lightgbm

我想使用自定义指标f1_score来训练一个lgb模型…

通过相关系数矩阵进行特征选择

我在测试不同的算法时,如逻辑回归、高斯朴素贝叶斯、随机…

可以将机器学习库用于流式输入和输出吗?

已关闭。此问题需要更加聚焦。目前不接受回答。 想要改进…

在TensorFlow中,queue.dequeue_up_to()方法的用途是什么?

我对这个方法感到非常困惑,特别是当我发现这个令人费解的…

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注