数据框中的特征列变为null

我是Spark的新手,我需要对我的数据进行一些机器学习操作,并预测“count”值。以下是我的原始数据:

05:49:56.604899 00:00:00:00:00:02 > 00:00:00:00:00:03, ethertype IPv4 (0x0800), length 10202: 10.0.0.2.54880 > 10.0.0.3.5001: Flags [.], seq 3641977583:3641987719, ack 129899328, win 58, options [nop,nop,TS val 432623 ecr 432619], length 1013605:49:56.604908 00:00:00:00:00:03 > 00:00:00:00:00:02, ethertype IPv4 (0x0800), length 66: 10.0.0.3.5001 > 10.0.0.2.54880: Flags [.], ack 10136, win 153, options [nop,nop,TS val 432623 ecr 432623], length 0

我使用以下代码创建了一个包含time_stamp_0、sender_ip_1和receiver_ip_2列的数据框:

  val customSchema = StructType(Array(  StructField("time_stamp_0", StringType, true),  StructField("sender_ip_1", StringType, true),  StructField("receiver_ip_2", StringType, true)))///////////////////////////////////////////////////make train dataframeval Dstream_Train = sc.textFile("/Users/saeedtkh/Desktop/sharedsaeed/Test/trace1.txt")val Row_Dstream_Train = Dstream_Train.map(line => line.split(">")).map(array => {  val first = Try(array(0).trim.split(" ")(0)) getOrElse ""  val second = Try(array(1).trim.split(" ")(6)) getOrElse ""  val third = Try(array(2).trim.split(" ")(0).replace(":", "")) getOrElse ""  val firstFixed = first.take(first.lastIndexOf("."))  val secondfix = second.take(second.lastIndexOf("."))  val thirdFixed = third.take(third.lastIndexOf("."))  Row.fromSeq(Seq(firstFixed, secondfix, thirdFixed))})val Frist_Dataframe = session.createDataFrame(Row_Dstream_Train, customSchema).toDF("time_stamp_0", "sender_ip_1", "receiver_ip_2")val columns1and2 = Window.partitionBy("sender_ip_1", "receiver_ip_2") // <-- matches groupBy///I add count to the dataframeval Dataframe = Frist_Dataframe.withColumn("count", count($"receiver_ip_2") over columns1and2)Dataframe.show()

以下是输出结果:

+------------+-----------+-------------+-----+|time_stamp_0|sender_ip_1|receiver_ip_2|count|+------------+-----------+-------------+-----+|    05:49:56|   10.0.0.2|     10.0.0.3|   19||    05:49:56|   10.0.0.2|     10.0.0.3|   19||    05:49:56|   10.0.0.2|     10.0.0.3|   19||    05:49:56|   10.0.0.2|     10.0.0.3|   19||    05:49:56|   10.0.0.2|     10.0.0.3|   19||    05:49:56|   10.0.0.2|     10.0.0.3|   19||    05:49:56|   10.0.0.2|     10.0.0.3|   19||    05:49:56|   10.0.0.2|     10.0.0.3|   19||    05:49:56|   10.0.0.2|     10.0.0.3|   19||    05:49:56|   10.0.0.2|     10.0.0.3|   19||    05:49:56|   10.0.0.2|     10.0.0.3|   19||    05:49:56|   10.0.0.2|     10.0.0.3|   19||    05:49:56|   10.0.0.2|     10.0.0.3|   19||    05:49:56|   10.0.0.2|     10.0.0.3|   19||    05:49:56|   10.0.0.2|     10.0.0.3|   19||    05:49:56|   10.0.0.2|     10.0.0.3|   19||    05:49:56|   10.0.0.2|     10.0.0.3|   19||    05:49:56|   10.0.0.2|     10.0.0.3|   19||    05:49:56|   10.0.0.3|     10.0.0.2|   10|+------------+-----------+-------------+-----+

我想预测两个IP之间的连接数量。我在数据框中添加了count列。我还尝试创建标签和特征来开始预测。我还需要将数据分割为训练和测试部分。我使用了以下代码:

    val toVec4    = udf[Vector, Int, Int, String, String] { (a,b,c,d) =>      val e3 = c match {        case "10.0.0.1" => 1        case "10.0.0.2" => 2        case "10.0.0.3" => 3      }      val e4 = d match {        case "10.0.0.1" => 1        case "10.0.0.2" => 2        case "10.0.0.3" => 3      }      Vectors.dense(a, b, e3, e4)    }    //val encodeLabel   = udf[Double, String]( _ match { case "A" => 0.0 case "B" => 1.0} )    val final_df = Dataframe.withColumn(      "features",      toVec4(        Dataframe("time_stamp_0"),        Dataframe("count"),        Dataframe("sender_ip_1"),        Dataframe("receiver_ip_2")      )    ).withColumn("label", (Dataframe("count"))).select("features", "label")final_df.show()    val trainingTest = final_df.randomSplit(Array(0.3, 0.7))    val TrainingDF = trainingTest(0)    val TestingDF=trainingTest(1)    //TrainingDF.show()    //TestingDF.show()

然而,问题是特征变成了null!

+--------+-----+|features|label|+--------+-----+|    null|   19||    null|   19||    null|   19||    null|   19||    null|   19||    null|   19||    null|   19||    null|   19||    null|   19||    null|   19||    null|   19||    null|   19||    null|   19||    null|   19||    null|   19||    null|   19||    null|   19||    null|   19||    null|   10|+--------+-----+

有谁能帮我解决这个问题吗?提前谢谢。


回答:

这里的问题是你的UDF期望四个输入列的类型为Int, Int, String, String,而你传递的第一列(time_stamp_0)是String类型。

你可以通过调整UDF或将字段转换为Int来解决这个问题:

import org.apache.spark.sql.functions._import org.apache.spark.sql.types._val final_df = df.withColumn(  "features",  toVec4(    // 将字符串转换为Timestamp类型,然后转换为Int类型    $"time_stamp_0".cast(TimestampType).cast(IntegerType),    $"count",    $"sender_ip_1",    $"receiver_ip_2"  ))

我必须说我期望看到一个适当的异常而不是null结果,但显然这是当前的行为表现。

Related Posts

使用LSTM在Python中预测未来值

这段代码可以预测指定股票的当前日期之前的值,但不能预测…

如何在gensim的word2vec模型中查找双词组的相似性

我有一个word2vec模型,假设我使用的是googl…

dask_xgboost.predict 可以工作但无法显示 – 数据必须是一维的

我试图使用 XGBoost 创建模型。 看起来我成功地…

ML Tuning – Cross Validation in Spark

我在https://spark.apache.org/…

如何在React JS中使用fetch从REST API获取预测

我正在开发一个应用程序,其中Flask REST AP…

如何分析ML.NET中多类分类预测得分数组?

我在ML.NET中创建了一个多类分类项目。该项目可以对…

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注