Spark中的求解器,根据输入的训练数据及设定的迭代次数、正则化项、参数收敛精度等进行迭代求解模型的参数。Spark内部实现来两类求解器,基于随机梯度下降(miniBatch选取样本)的GradientDescent、基于大规模数值优化算法的LBFGS。
在整体架构上,两个类都继承自Optimizer,并需要调用Gradient和Updater
GradientDescent是对随机梯度下降算法封装的一个求解器,通过runMiniBatchSGD方法实现模型参数的迭代计算,基本流程是:
1、根据miniBatchFraction参数进行样本抽样,获得一个小样本集
2、调用Gradient计算在小样本集上的梯度值
3、调用Updater,根据regParam、stepSize、numIterations等参数值更新模型参数
4、判断终止条件(精度收敛或者迭代次数达到上限),否则继续上面步骤。
核心代码如下;
while (!converged && i <= numIterations) {
//将参数广播到各台机器上,实际上是集群下模型参数的共享和同步
val bcWeights = data.context.broadcast(weights)
// Sample a subset (fraction miniBatchFraction) of the total data
// compute and sum up the subgradients on this subset (this is one map-reduce)
//在各分区上调用seqOp计算梯度值、误差值
//调用combOp对各分区计算的结果进行聚合
//这样得到的是各分区计算得到的梯度值得总和,后面会利用miniBatchSize计算平均梯度并传入updater进行更新
val (gradientSum, lossSum, miniBatchSize) = data.sample(false, miniBatchFraction, 42 + i)
.treeAggregate((BDV.zeros[Double](n), 0.0, 0L))(
seqOp = (c, v) => {
// c: (grad, loss, count), v: (label, features)
val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1))
(c._1, c._2 + l, c._3 + 1)
},
combOp = (c1, c2) => {
// c: (grad, loss, count)
(c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3)
})
if (miniBatchSize > 0) {
/**
* lossSum is computed using the weights from the previous iteration
* and regVal is the regularization value computed in the previous iteration as well.
*/
stochasticLossHistory += lossSum / miniBatchSize + regVal
val update = updater.compute(
weights, Vectors.fromBreeze(gradientSum / miniBatchSize.toDouble),
stepSize, i, regParam)
weights = update._1
regVal = update._2
previousWeights = currentWeights
currentWeights = Some(weights)
if (previousWeights != None && currentWeights != None) {
converged = isConverged(previousWeights.get,
currentWeights.get, convergenceTol)
}
} else {
logWarning(s"Iteration ($i/$numIterations). The size of sampled batch is zero")
}
i += 1
}
梯度下降法是线性逼近的,在靠近最优点时容易出现震荡(即在最优点旁边晃来晃去,就是取不到这个最优值),相对来说,牛顿法、拟牛顿法、LM方法等二阶优化方法,由于非线性逼近的特性,收敛速度和收敛精度斗会高于梯度下降法,但这些方法都是相当消耗内存的,比如牛顿法,需要计算Hessian矩阵(二阶偏导数矩阵)及其逆矩阵 ,如果模型参数是N维,则Hessian矩阵大小为N*N,存储和计算逆矩阵斗相当困难,限制来在大规模机器学习中的应用。基于牛顿法和拟牛顿法,LBFGS在计算Hessian矩阵逆矩阵的时候做了一些近似工作。
LBFGS详细可参考http://mlworks.cn/posts/introduction-to-l-bfgs/
Spark本身没有实现LBFGS底层算法,而是调用来breeze包,Spark实现了损失函数CostFun,使用CachedDiffFunction类缓存最近的m次输入变量和梯度变量的差值。
private class CostFun(
data: RDD[(Double, Vector)],
gradient: Gradient,
updater: Updater,
regParam: Double,
numExamples: Long) extends DiffFunction[BDV[Double]] {
override def calculate(weights: BDV[Double]): (Double, BDV[Double]) = {
// Have a local copy to avoid the serialization of CostFun object which is not serializable.
val w = Vectors.fromBreeze(weights)
val n = w.size
val bcW = data.context.broadcast(w)
val localGradient = gradient
val (gradientSum, lossSum) = data.treeAggregate((Vectors.zeros(n), 0.0))(
seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, features)) =>
val l = localGradient.compute(
features, label, bcW.value, grad)
(grad, loss + l)
},
combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) =>
axpy(1.0, grad2, grad1)
(grad1, loss1 + loss2)
})
// broadcasted model is not needed anymore
bcW.destroy()
val regVal = updater.compute(w, Vectors.zeros(n), 0, 1, regParam)._2
val loss = lossSum / numExamples + regVal
val gradientTotal = w.copy
axpy(-1.0, updater.compute(w, Vectors.zeros(n), 1, 1, regParam)._1, gradientTotal)
axpy(1.0 / numExamples, gradientSum, gradientTotal)
(loss, gradientTotal.asBreeze.asInstanceOf[BDV[Double]])
}
}
def runLBFGS(
data: RDD[(Double, Vector)],
gradient: Gradient,
updater: Updater,
numCorrections: Int,
convergenceTol: Double,
maxNumIterations: Int,
regParam: Double,
initialWeights: Vector): (Vector, Array[Double]) = {
val lossHistory = mutable.ArrayBuilder.make[Double]
val numExamples = data.count()
val costFun =
new CostFun(data, gradient, updater, regParam, numExamples)
val lbfgs = new BreezeLBFGS[BDV[Double]](maxNumIterations, numCorrections, convergenceTol)
val states =
lbfgs.iterations(new CachedDiffFunction(costFun), initialWeights.asBreeze.toDenseVector)
var state = states.next()
while (states.hasNext) {
lossHistory += state.value
state = states.next()
}
lossHistory += state.value
val weights = Vectors.fromBreeze(state.x)
val lossHistoryArray = lossHistory.result()
(weights, lossHistoryArray)
}
再回头看LBFGS几个内部参数就简单明了,numCorrections即控制输入变量和梯度变量的差值要缓存最近的多少次,convergenceTol即收敛精度。
分布式机器学习思想主要体现在
1、模型参数的共享和同步(使用Spark broadcast机制实现)。
2、分布式的梯度求解,先求解各分区上单个样本的梯度值,再聚合得到总的梯度值(通过treeAggregate算子实现)。之后在Drvie端调用updater来更新模型参数。