如何在PySpark中为数据框的记录/行分配统计频率,而不使用.toPandas()方法?

我是PySpark的新手,我想将包括编码和归一化部分的预处理脚本从Pythonic风格转换为PySpark,用于合成数据。(A列和C列是分类列)首先,我有一个名为sdf的Spark数据框,包含5列:

下面是一个示例:

#+----------+-----+---+-------+----+#|A         |B    |C  |D      |E   |#+----------+-----+---+-------+----+#|Sentence  |92   |6  |False  |49  |#|Sentence  |17   |3  |False  |15  |#|Sentence  |17   |3  |False  |15  |#|-         |0    |0  |False  |0   |#|-         |0    |0  |False  |0   |#|-         |0    |0  |False  |0   |#+----------+-----+---+-------+----+

现在我想在其他特征之外分配统计频率,并将结果与sdf连接。到目前为止,我可以使用Pythonic脚本完成这个任务:

#import libsimport copyimport numpy as npimport pandas as pdfrom sklearn.preprocessing import LabelEncoderfrom sklearn.preprocessing import MinMaxScalerfrom sklearn.preprocessing import scalefrom sklearn import preprocessing#Statistical Preprocessingdef add_freq_to_features(df):  frequencies_df = df.groupby(list(df.columns)).size().to_frame().rename(columns={0: "Freq"})  frequencies_df["Freq"] = frequencies_df["Freq"] / frequencies_df["Freq"].sum() # Normalzing 0 & 1  new_df = pd.merge(df, frequencies_df, how='left', on=list(df.columns))    return new_df# Encode and Normalizedef normalize_features(df):  temp_df = df.copy()    le = preprocessing.LabelEncoder()  #le.fit(temp_df)      temp_df[["A", "C"]] = temp_df[["A", "C"]].apply(le.fit_transform)    for column in ["A", "B", "C", "D", "E"]:    #-1: all rows selected into 1     # reshape(1, -1) select one row contains all columns/features    temp_df[column] = MinMaxScaler().fit_transform(temp_df[column].values.reshape(-1, 1))       return temp_df# Apply frequency allocation and merge with extracted features dffeatures_df = add_freq_to_features(features_df)#Apply Encoding and Normalizing functionnormalized_features_df = normalize_features(features_df)to_numeric_columns = ["A", "B" , "C", "D", "E", "Freq"]normalized_features_df[to_numeric_columns] = normalized_features_df[to_numeric_columns].apply(pd.to_numeric)#normalized_features_df

问题:在不将Spark数据框转换为Pandas数据框toPandas()的情况下,翻译预处理的最佳方法是什么,以优化管道并100%使用Spark形式处理?

预期输出如下所示,以Spark数据框的形式呈现:

#+----------+-----+---+-------+----+----------+#|A         |B    |C  |D      |E   |Freq      |#+----------+-----+---+-------+----+----------+#|Sentence  |92   |6  |False  |49  |0.166667  |#|Sentence  |17   |3  |False  |15  |0.333333  |#|Sentence  |17   |3  |False  |15  |0.333333  |#|-         |0    |0  |False  |0   |0.500000  |#|-         |0    |0  |False  |0   |0.500000  |#|-         |0    |0  |False  |0   |0.500000  |#+----------+-----+---+-------+----+----------+

回答:

Spark有一个名为Spark MLlib的包,专门用于特征工程和机器学习。因此,您不应该像使用Pandas那样手动构建特征。最终,您仍然需要使用Spark来构建模型,那么为什么不从一开始就正确使用Spark ML呢?我强烈建议阅读一些章节,如构建特征构建管道,然后是分类/回归,以及其他一些算法。

回到您的原始问题,这是您示例代码的Spark版本(我也将其在您的笔记本中运行,并做了一些小的更改以适应您的变量。)

# this is to build "raw" Freqsdf2 = (sdf    .groupBy(sdf.columns)    .agg(F.count('*').alias('Freq'))    .withColumn('Encoding_type', F.col('Encoding_type').cast('string')))sdf2.cache().count()sdf2.show()# this is to normalize features using MinMaxScalerfrom pyspark.ml import Pipelinefrom pyspark.ml.feature import StringIndexerfrom pyspark.ml.feature import VectorAssemblerfrom pyspark.ml.feature import MinMaxScalertype_indexer = StringIndexer(inputCol='Type', outputCol='Type_Cat')encoding_indexer = StringIndexer(inputCol='Encoding_type', outputCol='Encoding_Type_Cat')assembler = VectorAssembler(inputCols=['Type_Cat', 'Length', 'Token_number', 'Encoding_Type_Cat', 'Character_feature', 'Freq'], outputCol='features')scaler = MinMaxScaler(inputCol='features', outputCol='scaled_features')pipeline = Pipeline(stages=[type_indexer, encoding_indexer, assembler, scaler])# Compute summary statistics and generate modelmodel = pipeline.fit(sdf2)# rescale each feature to range [min, max].model.transform(sdf2).show(10, False)# Output# +------+------+------------+-------------+-----------------+----+--------+-----------------+-------------------------+-------------------------+# |Type  |Length|Token_number|Encoding_type|Character_feature|Freq|Type_Cat|Encoding_Type_Cat|features                 |scaled_features          |# +------+------+------------+-------------+-----------------+----+--------+-----------------+-------------------------+-------------------------+# |String|8     |0           |true         |7                |1   |0.0     |0.0              |[0.0,8.0,0.0,0.0,7.0,1.0]|[0.5,1.0,0.5,0.5,1.0,0.5]|# |String|0     |0           |true         |0                |1   |0.0     |0.0              |(6,[5],[1.0])            |[0.5,0.0,0.5,0.5,0.0,0.5]|# +------+------+------------+-------------+-----------------+----+--------+-----------------+-------------------------+-------------------------+

Related Posts

使用LSTM在Python中预测未来值

这段代码可以预测指定股票的当前日期之前的值,但不能预测…

如何在gensim的word2vec模型中查找双词组的相似性

我有一个word2vec模型,假设我使用的是googl…

dask_xgboost.predict 可以工作但无法显示 – 数据必须是一维的

我试图使用 XGBoost 创建模型。 看起来我成功地…

ML Tuning – Cross Validation in Spark

我在https://spark.apache.org/…

如何在React JS中使用fetch从REST API获取预测

我正在开发一个应用程序,其中Flask REST AP…

如何分析ML.NET中多类分类预测得分数组?

我在ML.NET中创建了一个多类分类项目。该项目可以对…

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注