我想为机器学习生成训练集和测试集。假设我有一个包含以下列的数据框:
account_id | session_id | feature_1 | feature_2 | label
在这个数据集中,每一行都有一个唯一的session_id,但account_id可能会多次出现。然而,我希望我的训练集和测试集拥有互斥的account_id。(这几乎像是分层抽样的相反操作)。
在pandas中,这很简单。我有类似如下的代码:
def train_test_split(df, split_col, feature_cols, label_col, test_fraction=0.2):
"""
虽然sklearn的train_test_split是按数据集中的每一行进行分割,
但这个函数将按特定列进行分割。这样,我们可以分离account_id,
使得训练集和测试集拥有互斥的账户,以最小化训练集和测试集之间的交叉影响。
"""
split_values = df[split_col].drop_duplicates()
test_values = split_values.sample(frac=test_fraction, random_state=42)
df_test = df[df[split_col].isin(test_values)]
df_train = df[~df[split_col].isin(test_values)]
return df_test, df_train
现在,我的数据集足够大,无法装入内存,我必须从pandas切换到在pyspark中完成所有这些操作。如何在pyspark中分割训练集和测试集,使其拥有互斥的account_id,而不需要将所有数据装入内存?
回答:
您可以使用pyspark.sql.functions
中的rand()
函数,为每个不同的account_id
生成一个随机数,并基于这个随机数创建train
和test
数据框。
from pyspark.sql import functions as F
TEST_FRACTION = 0.2
train_test_split = (df.select("account_id")
.distinct() # 移除重复的account_ids
.withColumn("rand_val", F.rand())
.withColumn("data_type", F.when(F.col("rand_val") < TEST_FRACTION, "test")
.otherwise("train")))
train_df = (train_test_split.filter(F.col("data_type") == "train")
.join(df, on="account_id")) # 内连接移除所有非训练行的数据
test_df = (train_test_split.filter(F.col("data_type") == "test")
.join(df, on="account_id"))
由于一个account_id
不能同时是train
和test
,因此train_df
和test_df
将拥有互斥的account_id
。