我可以在Spark批处理中创建一个模型,然后在Spark流处理中用于实时处理吗?
我在Apache Spark网站上看到了一些例子,其中训练和预测都是在相同类型的处理上构建的(如线性回归)。
回答:
这里是我刚刚实现的另一种解决方案。
我在Spark批处理中创建了一个模型,假设最终的模型对象名为regmodel。
final LinearRegressionModel regmodel =algorithm.run(JavaRDD.toRDD(parsedData));
Spark上下文的名称为sc,如下所示:
JavaSparkContext sc = new JavaSparkContext(sparkConf);
现在在同一代码中,我使用相同的sc创建了一个Spark流处理上下文
final JavaStreamingContext jssc = new JavaStreamingContext(sc,new Duration(Integer.parseInt(conf.getWindow().trim())));
并进行预测如下:
JavaPairDStream<Double, Double> predictvalue = dist1.mapToPair(new PairFunction<LabeledPoint, Double,Double>() { private static final long serialVersionUID = 1L; @Override public Tuple2<Double, Double> call(LabeledPoint v1) throws Exception { Double p = v1.label(); Double q = regmodel.predict(v1.features()); return new Tuple2<Double, Double>(p,q); } });