我想使用Flink-HBase插件来读取数据,这些数据将作为Flink机器学习算法的输入,具体来说是SVM和多元线性回归(MLR)。目前,我先将提取的数据写入临时文件,然后通过libSVM方法读取它,但我认为应该有更高级的方法。
你有代码片段或想法可以分享吗?
回答:
没有必要将数据写入磁盘然后再用MLUtils.readLibSVM
读取。原因如下。
MLUtils.readLibSVM
期望一个文本文件,每行是一个稀疏特征向量及其关联的标签。它使用以下格式来表示标签-特征向量对:
<line> .=. <label> <feature>:<value> <feature>:<value> ... <feature>:<value> # <info>
其中<feature>
是特征向量中后续value
的索引。MLUtils.readLibSVM
可以读取这种格式的文件,并将每一行转换为LabeledVector
实例。因此,读取libSVM文件后,你会得到一个DataSet[LabeledVector]
,这正是SVM
和MultipleLinearRegression
预测器所需的输入格式。
然而,根据你从HBase获取的数据格式,你首先需要将数据转换为libSVM
格式。否则,MLUtils.readLibSVM
将无法读取写入的文件。如果你转换了数据,那么你也可以直接将数据转换为DataSet[LabeledVector]
,并将其用作Flink的ML算法的输入。这避免了不必要的磁盘操作周期。
如果你从HBase获得一个DataSet[String]
,其中每个字符串都具有libSVM
格式(见上面的规格),那么你可以对HBase的DataSet
应用以下map
操作。
val hbaseInput: DataSet[String] = ...val labelCOODS = hbaseInput.flatMap { line => // 移除所有以'#'开头的注释 val commentFreeLine = line.takeWhile(_ != '#').trim if(commentFreeLine.nonEmpty) { val splits = commentFreeLine.split(' ') val label = splits.head.toDouble val sparseFeatures = splits.tail val coos = sparseFeatures.map { str => val pair = str.split(':') require( pair.length == 2, "每个特征条目必须具有<feature>:<value>的形式") // libSVM索引是从1开始的,但我们期望它是从0开始的 val index = pair(0).toInt - 1 val value = pair(1).toDouble (index, value) } Some((label, coos)) } else { None }// 计算向量的最大维度val dimensionDS = labelCOODS.map { labelCOO => labelCOO._2.map( _._1 + 1 ).max}.reduce(scala.math.max(_, _))val labeledVectors: DataSet[LabeledVector] = labelCOODS.map{ new RichMapFunction[(Double, Array[(Int, Double)]), LabeledVector] { var dimension = 0 override def open(configuration: Configuration): Unit = { dimension = getRuntimeContext.getBroadcastVariable(DIMENSION).get(0) } override def map(value: (Double, Array[(Int, Double)])): LabeledVector = { new LabeledVector(value._1, SparseVector.fromCOO(dimension, value._2)) }}}.withBroadcastSet(dimensionDS, DIMENSION)
这将把你的libSVM格式数据转换为LabeledVectors
的数据集。