我无法在Apache Spark中使用Scala的流模式创建数据框进行在线预测

我是Spark的新手,我想编写一个流程序。我需要预测每一行的数字重复情况。以下是我的原始数据:

05:49:56.604899 00:00:00:00:00:02 > 00:00:00:00:00:03, ethertype IPv4 (0x0800), length 10202: 10.0.0.2.54880 > 10.0.0.3.5001: Flags [.], seq 3641977583:3641987719, ack 129899328, win 58, options [nop,nop,TS val 432623 ecr 432619], length 1013605:49:56.604908 00:00:00:00:00:03 > 00:00:00:00:00:02, ethertype IPv4 (0x0800), length 66: 10.0.0.3.5001 > 10.0.0.2.54880: Flags [.], ack 10136, win 153, options [nop,nop,TS val 432623 ecr 432623], length 005:49:56.604900 00:00:00:00:00:02 > 00:00:00:00:00:03, ethertype IPv4 (0x0800), length 4410: 10.0.0.2.54880 > 10.0.0.3.5001: Flags [P.], seq 10136:14480, ack 1, win 58, options [nop,nop,TS val 432623 ecr 432619], length 4344

我编写了一个代码来提取我需要的输出如下所示。(我需要column1和column2的重复次数)

enter image description here

这是我的代码:

然而,我的代码并不是流模式。我编写了另一段代码来实现流模式。因为train.csv文件是以流的方式生成的。但是我遇到了一些错误。这是我的流代码:

import org.apache.spark.SparkConfimport org.apache.spark.mllib.linalg.Vectorsimport org.apache.spark.mllib.regression.{LabeledPoint, StreamingLinearRegressionWithSGD}import org.apache.spark.sql.Rowimport org.apache.spark.sql.types.{StringType, StructField, StructType}import org.apache.spark.streaming.{Seconds, StreamingContext}import scala.util.Try/**  * Created by saeedtkh on 5/24/17.  */object Main_ML_with_Streaming {  def main(args: Array[String]) {    val conf = new SparkConf().setAppName("saeed_test").setMaster("local[*]")    //val sc = new SparkContext(conf)    val ssc = new StreamingContext(conf, Seconds(5))    /////////////////////Start extract the packet    val customSchema = StructType(Array(      StructField("column0", StringType, true),      StructField("column1", StringType, true),      StructField("column2", StringType, true)))    val rdd = ssc.textFileStream("/Users/saeedtkh/Desktop/sharedsaeed/train.csv")    val rowRdd =rdd.map(line => line.split(">")).map(array => {      val first = Try(array(0).trim.split(" ")(0)) getOrElse ""      val second = Try(array(1).trim.split(" ")(6)) getOrElse ""      val third = Try(array(2).trim.split(" ")(0).replace(":", "")) getOrElse ""      Row.fromSeq(Seq(first, second, third))    })    val dataFrame_trainingData = sqlContext.createDataFrame(rowRdd, customSchema)    dataFrame_trainingData.groupBy("column1","column2").count().show()    /////////////////////end extract the packet    val testData = ssc.textFileStream(/Users/saeedtkh/Desktop/sharedsaeed/test.csv).map(LabeledPoint.parse)    ////////////////////end trainging and testing    val numFeatures = 3    val model = new StreamingLinearRegressionWithSGD()      .setInitialWeights(Vectors.zeros(numFeatures))    model.trainOn(dataFrame_trainingData)    model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()    ssc.start()    ssc.awaitTermination()    print("Here is the anwser: *****########*********#########*******222")  }}

问题在于,我无法在代码的这一行中使用sqlContext创建数据框:

val dataFrame_trainingData = sqlContext.createDataFrame(rowRdd, customSchema)

有谁能帮我修改这段代码,使其能够以流的方式工作,并使用线性回归或其他算法预测每一行的重复情况。非常感谢。

更新1:根据第一个回答,我添加了foreach,但错误仍然存在:enter image description here


回答:

首先,需要注意的是,ssc.textFileStream返回的是一个DStream,而不是RDD,所以你命名为rddrowRddtestData的变量实际上不是RDD,而是对连续RDD序列的抽象。因此,你不能将这些传递给createDataFrame,因为它期望的是RDD。

你可以使用DStream.foreachRDD从每个底层的RDD创建一个DataFrame,如这里所述:

rowRdd.foreachRDD { rdd =>  val dataFrame_trainingData = sqlContext.createDataFrame(rdd, customSchema)  // ...} 

然而,你应该注意到,StreamingLinearRegressionWithSGD期望trainOnpredictOnValues的输入是DStreams – 所以你可以直接传递原始的DStreams,而无需将它们转换为DataFrame。

Related Posts

L1-L2正则化的不同系数

我想对网络的权重同时应用L1和L2正则化。然而,我找不…

使用scikit-learn的无监督方法将列表分类成不同组别,有没有办法?

我有一系列实例,每个实例都有一份列表,代表它所遵循的不…

f1_score metric in lightgbm

我想使用自定义指标f1_score来训练一个lgb模型…

通过相关系数矩阵进行特征选择

我在测试不同的算法时,如逻辑回归、高斯朴素贝叶斯、随机…

可以将机器学习库用于流式输入和输出吗?

已关闭。此问题需要更加聚焦。目前不接受回答。 想要改进…

在TensorFlow中,queue.dequeue_up_to()方法的用途是什么?

我对这个方法感到非常困惑,特别是当我发现这个令人费解的…

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注