如何在pyspark中按列值而非按行进行训练/测试分割

我想为机器学习生成训练集和测试集。假设我有一个包含以下列的数据框:

account_id | session_id | feature_1 | feature_2 | label

在这个数据集中,每一行都有一个唯一的session_id,但account_id可能会多次出现。然而,我希望我的训练集和测试集拥有互斥的account_id。(这几乎像是分层抽样的相反操作)。

在pandas中,这很简单。我有类似如下的代码:

def train_test_split(df, split_col, feature_cols, label_col, test_fraction=0.2):
    """
    虽然sklearn的train_test_split是按数据集中的每一行进行分割,
    但这个函数将按特定列进行分割。这样,我们可以分离account_id,
    使得训练集和测试集拥有互斥的账户,以最小化训练集和测试集之间的交叉影响。
    """
    split_values = df[split_col].drop_duplicates()
    test_values = split_values.sample(frac=test_fraction, random_state=42)

    df_test = df[df[split_col].isin(test_values)]
    df_train = df[~df[split_col].isin(test_values)]
    return df_test, df_train

现在,我的数据集足够大,无法装入内存,我必须从pandas切换到在pyspark中完成所有这些操作。如何在pyspark中分割训练集和测试集,使其拥有互斥的account_id,而不需要将所有数据装入内存?


回答:

您可以使用pyspark.sql.functions中的rand()函数,为每个不同的account_id生成一个随机数,并基于这个随机数创建traintest数据框。

from pyspark.sql import functions as F
TEST_FRACTION = 0.2
train_test_split = (df.select("account_id")
                      .distinct()  # 移除重复的account_ids
                      .withColumn("rand_val", F.rand())
                      .withColumn("data_type", F.when(F.col("rand_val") < TEST_FRACTION, "test")
                                                .otherwise("train")))
train_df = (train_test_split.filter(F.col("data_type") == "train")
                            .join(df, on="account_id"))  # 内连接移除所有非训练行的数据
test_df = (train_test_split.filter(F.col("data_type") == "test")
                           .join(df, on="account_id"))

由于一个account_id不能同时是traintest,因此train_dftest_df将拥有互斥的account_id

Related Posts

使用LSTM在Python中预测未来值

这段代码可以预测指定股票的当前日期之前的值,但不能预测…

如何在gensim的word2vec模型中查找双词组的相似性

我有一个word2vec模型,假设我使用的是googl…

dask_xgboost.predict 可以工作但无法显示 – 数据必须是一维的

我试图使用 XGBoost 创建模型。 看起来我成功地…

ML Tuning – Cross Validation in Spark

我在https://spark.apache.org/…

如何在React JS中使用fetch从REST API获取预测

我正在开发一个应用程序,其中Flask REST AP…

如何分析ML.NET中多类分类预测得分数组?

我在ML.NET中创建了一个多类分类项目。该项目可以对…

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注