我是PySpark的新手,我想将包括编码和归一化部分的预处理脚本从Pythonic风格转换为PySpark,用于合成数据。(A列和C列是分类列)首先,我有一个名为sdf
的Spark数据框,包含5列:
下面是一个示例:
#+----------+-----+---+-------+----+#|A |B |C |D |E |#+----------+-----+---+-------+----+#|Sentence |92 |6 |False |49 |#|Sentence |17 |3 |False |15 |#|Sentence |17 |3 |False |15 |#|- |0 |0 |False |0 |#|- |0 |0 |False |0 |#|- |0 |0 |False |0 |#+----------+-----+---+-------+----+
现在我想在其他特征之外分配统计频率,并将结果与sdf
连接。到目前为止,我可以使用Pythonic脚本完成这个任务:
#import libsimport copyimport numpy as npimport pandas as pdfrom sklearn.preprocessing import LabelEncoderfrom sklearn.preprocessing import MinMaxScalerfrom sklearn.preprocessing import scalefrom sklearn import preprocessing#Statistical Preprocessingdef add_freq_to_features(df): frequencies_df = df.groupby(list(df.columns)).size().to_frame().rename(columns={0: "Freq"}) frequencies_df["Freq"] = frequencies_df["Freq"] / frequencies_df["Freq"].sum() # Normalzing 0 & 1 new_df = pd.merge(df, frequencies_df, how='left', on=list(df.columns)) return new_df# Encode and Normalizedef normalize_features(df): temp_df = df.copy() le = preprocessing.LabelEncoder() #le.fit(temp_df) temp_df[["A", "C"]] = temp_df[["A", "C"]].apply(le.fit_transform) for column in ["A", "B", "C", "D", "E"]: #-1: all rows selected into 1 # reshape(1, -1) select one row contains all columns/features temp_df[column] = MinMaxScaler().fit_transform(temp_df[column].values.reshape(-1, 1)) return temp_df# Apply frequency allocation and merge with extracted features dffeatures_df = add_freq_to_features(features_df)#Apply Encoding and Normalizing functionnormalized_features_df = normalize_features(features_df)to_numeric_columns = ["A", "B" , "C", "D", "E", "Freq"]normalized_features_df[to_numeric_columns] = normalized_features_df[to_numeric_columns].apply(pd.to_numeric)#normalized_features_df
问题:在不将Spark数据框转换为Pandas数据框toPandas()
的情况下,翻译预处理的最佳方法是什么,以优化管道并100%使用Spark形式处理?
预期输出如下所示,以Spark数据框的形式呈现:
#+----------+-----+---+-------+----+----------+#|A |B |C |D |E |Freq |#+----------+-----+---+-------+----+----------+#|Sentence |92 |6 |False |49 |0.166667 |#|Sentence |17 |3 |False |15 |0.333333 |#|Sentence |17 |3 |False |15 |0.333333 |#|- |0 |0 |False |0 |0.500000 |#|- |0 |0 |False |0 |0.500000 |#|- |0 |0 |False |0 |0.500000 |#+----------+-----+---+-------+----+----------+
回答:
Spark有一个名为Spark MLlib的包,专门用于特征工程和机器学习。因此,您不应该像使用Pandas那样手动构建特征。最终,您仍然需要使用Spark来构建模型,那么为什么不从一开始就正确使用Spark ML呢?我强烈建议阅读一些章节,如构建特征,构建管道,然后是分类/回归,以及其他一些算法。
回到您的原始问题,这是您示例代码的Spark版本(我也将其在您的笔记本中运行,并做了一些小的更改以适应您的变量。)
# this is to build "raw" Freqsdf2 = (sdf .groupBy(sdf.columns) .agg(F.count('*').alias('Freq')) .withColumn('Encoding_type', F.col('Encoding_type').cast('string')))sdf2.cache().count()sdf2.show()# this is to normalize features using MinMaxScalerfrom pyspark.ml import Pipelinefrom pyspark.ml.feature import StringIndexerfrom pyspark.ml.feature import VectorAssemblerfrom pyspark.ml.feature import MinMaxScalertype_indexer = StringIndexer(inputCol='Type', outputCol='Type_Cat')encoding_indexer = StringIndexer(inputCol='Encoding_type', outputCol='Encoding_Type_Cat')assembler = VectorAssembler(inputCols=['Type_Cat', 'Length', 'Token_number', 'Encoding_Type_Cat', 'Character_feature', 'Freq'], outputCol='features')scaler = MinMaxScaler(inputCol='features', outputCol='scaled_features')pipeline = Pipeline(stages=[type_indexer, encoding_indexer, assembler, scaler])# Compute summary statistics and generate modelmodel = pipeline.fit(sdf2)# rescale each feature to range [min, max].model.transform(sdf2).show(10, False)# Output# +------+------+------------+-------------+-----------------+----+--------+-----------------+-------------------------+-------------------------+# |Type |Length|Token_number|Encoding_type|Character_feature|Freq|Type_Cat|Encoding_Type_Cat|features |scaled_features |# +------+------+------------+-------------+-----------------+----+--------+-----------------+-------------------------+-------------------------+# |String|8 |0 |true |7 |1 |0.0 |0.0 |[0.0,8.0,0.0,0.0,7.0,1.0]|[0.5,1.0,0.5,0.5,1.0,0.5]|# |String|0 |0 |true |0 |1 |0.0 |0.0 |(6,[5],[1.0]) |[0.5,0.0,0.5,0.5,0.0,0.5]|# +------+------+------------+-------------+-----------------+----+--------+-----------------+-------------------------+-------------------------+