我目前正在尝试创建一种监控解决方案 – 一些数据被写入Kafka,我使用Spark Streaming读取这些数据并进行处理。
为了对数据进行预处理,以便用于机器学习和异常检测,我希望根据一些过滤参数来拆分流。迄今为止,我了解到DStream本身无法拆分为多个流。
我主要面临的问题是,许多算法(如KMeans)只接受连续数据,而不接受离散数据,例如URL或其他字符串。
我的理想需求是:
- 从Kafka读取数据,并根据读取的内容生成一个字符串列表
- 基于该字符串列表生成多个流 – (拆分流、过滤流,或任何最佳实践)
- 使用这些流为每个流训练不同的模型以获得基线,并随后将所有后续数据与基线进行比较
我很乐意获得任何关于如何解决我的问题的建议。我无法想象这种场景在Spark中没有覆盖 – 然而,到目前为止,我还没有发现一个有效的解决方案。
回答:
我认为从原始流中创建派生DStream,使用filter和map应该就足够了:
val numericFeaturesDStream = originalDStream.filter(e => predicate(e)).map(e => extractFeatures(e))val otherNumericFeaturesDStream = originalDStream.filter(e => predicate2(e)).map(e => extractOtherFeatures(e))
请注意,这些filter
和map
步骤可以合并为一个collect
步骤(不要与无参数的RDD.collect混淆,后者会将数据传输到驱动程序!!!)
val featuresStream = originalDStream.transform(rdd => rdd.collect{case (a,b,c,d,e) if c=="client" => Vectors.parse(a)})streamingKMeans.trainOn(featuresStream)
我们还可以有一个动态的过滤DStream集合,存储在某个集合中。这里我们使用一个包含用于过滤的键的地图:
originalDStream.cache() // 当一个DStream被分支时,这对于性能非常重要。// filterKeys: Set[String] val dstreamByFilterKey = filterKeys.map(key => key -> originalDStream.filter(e => (getKey(e)==key)))// 对这个结构中的不同DStream做一些处理 ...
这些代码片段是需要完成实际逻辑的代码示例。