您的当前位置:首页正文

spark.mllib源码阅读-优化算法3-Optimizer

2024-11-10 来源:个人技术集锦

Spark中的求解器,根据输入的训练数据及设定的迭代次数、正则化项、参数收敛精度等进行迭代求解模型的参数。Spark内部实现来两类求解器,基于随机梯度下降(miniBatch选取样本)的GradientDescent、基于大规模数值优化算法的LBFGS。

在整体架构上,两个类都继承自Optimizer,并需要调用Gradient和Updater

GradientDescent

GradientDescent是对随机梯度下降算法封装的一个求解器,通过runMiniBatchSGD方法实现模型参数的迭代计算,基本流程是:

1、根据miniBatchFraction参数进行样本抽样,获得一个小样本集

2、调用Gradient计算在小样本集上的梯度值

3、调用Updater,根据regParam、stepSize、numIterations等参数值更新模型参数

4、判断终止条件(精度收敛或者迭代次数达到上限),否则继续上面步骤。

核心代码如下;

while (!converged && i <= numIterations) {
  //将参数广播到各台机器上,实际上是集群下模型参数的共享和同步
  val bcWeights = data.context.broadcast(weights)
  // Sample a subset (fraction miniBatchFraction) of the total data
  // compute and sum up the subgradients on this subset (this is one map-reduce)
  //在各分区上调用seqOp计算梯度值、误差值
  //调用combOp对各分区计算的结果进行聚合
  //这样得到的是各分区计算得到的梯度值得总和,后面会利用miniBatchSize计算平均梯度并传入updater进行更新
  val (gradientSum, lossSum, miniBatchSize) = data.sample(false, miniBatchFraction, 42 + i)
    .treeAggregate((BDV.zeros[Double](n), 0.0, 0L))(
      seqOp = (c, v) => {
        // c: (grad, loss, count), v: (label, features)
        val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1))
        (c._1, c._2 + l, c._3 + 1)
      },
      combOp = (c1, c2) => {
        // c: (grad, loss, count)
        (c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3)
      })

  if (miniBatchSize > 0) {
    /**
     * lossSum is computed using the weights from the previous iteration
     * and regVal is the regularization value computed in the previous iteration as well.
     */
    stochasticLossHistory += lossSum / miniBatchSize + regVal
    val update = updater.compute(
      weights, Vectors.fromBreeze(gradientSum / miniBatchSize.toDouble),
      stepSize, i, regParam)
    weights = update._1
    regVal = update._2

    previousWeights = currentWeights
    currentWeights = Some(weights)
    if (previousWeights != None && currentWeights != None) {
      converged = isConverged(previousWeights.get,
        currentWeights.get, convergenceTol)
    }
  } else {
    logWarning(s"Iteration ($i/$numIterations). The size of sampled batch is zero")
  }
  i += 1
}

LBFGS

梯度下降法是线性逼近的,在靠近最优点时容易出现震荡(即在最优点旁边晃来晃去,就是取不到这个最优值),相对来说,牛顿法、拟牛顿法、LM方法等二阶优化方法,由于非线性逼近的特性,收敛速度和收敛精度斗会高于梯度下降法,但这些方法都是相当消耗内存的,比如牛顿法,需要计算Hessian矩阵(二阶偏导数矩阵)及其逆矩阵 ,如果模型参数是N维,则Hessian矩阵大小为N*N,存储和计算逆矩阵斗相当困难,限制来在大规模机器学习中的应用。基于牛顿法和拟牛顿法,LBFGS在计算Hessian矩阵逆矩阵的时候做了一些近似工作。

LBFGS详细可参考http://mlworks.cn/posts/introduction-to-l-bfgs/

Spark本身没有实现LBFGS底层算法,而是调用来breeze包,Spark实现了损失函数CostFun,使用CachedDiffFunction类缓存最近的m次输入变量和梯度变量的差值。

private class CostFun(
  data: RDD[(Double, Vector)],
  gradient: Gradient,
  updater: Updater,
  regParam: Double,
  numExamples: Long) extends DiffFunction[BDV[Double]] {

  override def calculate(weights: BDV[Double]): (Double, BDV[Double]) = {
    // Have a local copy to avoid the serialization of CostFun object which is not serializable.
    val w = Vectors.fromBreeze(weights)
    val n = w.size
    val bcW = data.context.broadcast(w)
    val localGradient = gradient
    val (gradientSum, lossSum) = data.treeAggregate((Vectors.zeros(n), 0.0))(
        seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, features)) =>
          val l = localGradient.compute(
            features, label, bcW.value, grad)
          (grad, loss + l)
        },
        combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) =>
          axpy(1.0, grad2, grad1)
          (grad1, loss1 + loss2)
        })
    // broadcasted model is not needed anymore
    bcW.destroy()
    val regVal = updater.compute(w, Vectors.zeros(n), 0, 1, regParam)._2
    val loss = lossSum / numExamples + regVal
    val gradientTotal = w.copy
    axpy(-1.0, updater.compute(w, Vectors.zeros(n), 1, 1, regParam)._1, gradientTotal)
    axpy(1.0 / numExamples, gradientSum, gradientTotal)
    (loss, gradientTotal.asBreeze.asInstanceOf[BDV[Double]])
  }
}

def runLBFGS(
    data: RDD[(Double, Vector)],
    gradient: Gradient,
    updater: Updater,
    numCorrections: Int,
    convergenceTol: Double,
    maxNumIterations: Int,
    regParam: Double,
    initialWeights: Vector): (Vector, Array[Double]) = {
  val lossHistory = mutable.ArrayBuilder.make[Double]
  val numExamples = data.count()
  val costFun =
    new CostFun(data, gradient, updater, regParam, numExamples)
  val lbfgs = new BreezeLBFGS[BDV[Double]](maxNumIterations, numCorrections, convergenceTol)
  val states =
    lbfgs.iterations(new CachedDiffFunction(costFun), initialWeights.asBreeze.toDenseVector)
  var state = states.next()
  while (states.hasNext) {
    lossHistory += state.value
    state = states.next()
  }
  lossHistory += state.value
  val weights = Vectors.fromBreeze(state.x)
  val lossHistoryArray = lossHistory.result()
  (weights, lossHistoryArray)
}

再回头看LBFGS几个内部参数就简单明了,numCorrections即控制输入变量和梯度变量的差值要缓存最近的多少次,convergenceTol即收敛精度。

 

分布式机器学习思想主要体现在

1、模型参数的共享和同步(使用Spark broadcast机制实现)。

2、分布式的梯度求解,先求解各分区上单个样本的梯度值,再聚合得到总的梯度值(通过treeAggregate算子实现)。之后在Drvie端调用updater来更新模型参数。

Top