您的当前位置:首页正文

Kubernetes 源码分析 -- controller-manager总流程

2024-11-09 来源:个人技术集锦

程序入口

入口与APIServer一样,基于cobra框架,kube-controller-manager的结构如下:

type CMServer struct {
    cmoptions.ControllerManagerServer
}

type ControllerManagerServer struct {
    componentconfig.KubeControllerManagerConfiguration

    Master     string
    Kubeconfig string
}

一般来说,kube-controller-manager我们都会直接指定Master或者Kubeconfig参数,可以让Master指向127.0.0.1:6443,这样让kube-controller-manager与kube-apiserver运行在一台机器上。

  • 指定master地址的启动脚本

kube-controller-manager --master=127.0.0.1:8080 --root-ca-file=/data/herry2038/ssl/ca.pem --service-account-private-key-file=/data/herry2038/ssl/kubernetes-key.pem --leader-elect=true --logtostderr=false --log-dir=/data/herry2038/log/controller --v=2 --cluster-signing-cert-file=/data/herry2038/ssl/ca.pem --cluster-signing-key-file=/data/herry2038/ssl/ca-key.pem --feature-gates=RotateKubeletServerCertificate=true

  • 指定kubeconfig的启动脚本

kube-controller-manager --use-service-account-credentials=true --kubeconfig=/etc/kubernetes/controller-manager.conf --service-account-private-key-file=/etc/kubernetes/pki/sa.key --cluster-signing-key-file=/etc/kubernetes/pki/ca.key --address=127.0.0.1 --leader-elect=true --controllers=*,bootstrapsigner,tokencleaner --root-ca-file=/etc/kubernetes/pki/ca.crt --cluster-signing-cert-file=/etc/kubernetes/pki/ca.crt --allocate-node-cidrs=true --cluster-cidr=10.244.0.0/16 --node-cidr-mask-size=24

运行主流程

运行主流程比较清晰,代码在k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go中,注意,kube-controller-manager实现了高可用功能,我们可以同时启动多个进程,它们会进行选举一个leader,由leader来执行相关的功能。当现有leader故障的情况会,选举出新的leader执行任务。

func Run(s *options.CMServer) error {
    // To help debugging, immediately log version
    glog.Infof("Version: %+v", version.Get())
    if err := s.Validate(KnownControllers(), ControllersDisabledByDefault.List()); err != nil {
        return err
    }
    // 配置存放到全局配置中,configz实现了一个全局的配置功能,并且可以安装到HTTP服务中对外提供http访问配置的能力
    if c, err := configz.New("componentconfig"); err == nil {
        c.Set(s.KubeControllerManagerConfiguration)
    } else {
        glog.Errorf("unable to register configz: %s", err)
    }
    // 基于配置创建到API Server的连接以及Leader Election
    kubeClient, leaderElectionClient, kubeconfig, err := createClients(s)
    if err != nil {
        return err
    }
    // 启动HTTP服务
    if s.Port >= 0 {
        go startHTTP(s) // prof、configz、health、metrics
    }
    // 创建事件记录器,存储目标:API Server
    recorder := createRecorder(kubeClient)

    // 注意这里run是一个方法,主要目的是在leader election中,成为leader后才需要运行。follower不需要运行。
    run := func(stop <-chan struct{}) {
        // 这里用到了ControllerClientBuilder,它是用于创建到API Server的各种连接的构建器。
        // 由于在Controller中会有多种到API Server的连接,为了区分它们,就需要基于构建器来完成,
        // 在构建时会生成不同的Agent Name,从而能够根据Agent Name进行区分。
        rootClientBuilder := controller.SimpleControllerClientBuilder{
            ClientConfig: kubeconfig,
        }
        var clientBuilder controller.ControllerClientBuilder
        if s.UseServiceAccountCredentials {
            if len(s.ServiceAccountKeyFile) == 0 {
                // It's possible another controller process is creating the tokens for us.
                // If one isn't, we'll timeout and exit when our client builder is unable to create the tokens.
                glog.Warningf("--use-service-account-credentials was specified without providing a --service-account-private-key-file")
            }
            clientBuilder = controller.SAControllerClientBuilder{
                ClientConfig:         restclient.AnonymousClientConfig(kubeconfig),
                CoreClient:           kubeClient.CoreV1(),
                AuthenticationClient: kubeClient.AuthenticationV1(),
                Namespace:            "kube-system",
            }
        } else {
            clientBuilder = rootClientBuilder
        }
        // rootClientBuilder 用于sharedInformer与token控制器
        ctx, err := CreateControllerContext(s, rootClientBuilder, clientBuilder, stop)
        if err != nil {
            glog.Fatalf("error building controller context: %v", err)
        }
        //  service account token controller有点特殊,它必须要优先启动,为其他controller设置许可权,
        //  并且它不能使用普通的client builder,只能使用root client builder。
        //  它也不能包含在普通的初始化,必须优先启动。
        saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController

        // 启动控制器
        if err := StartControllers(ctx, saTokenControllerInitFunc, NewControllerInitializers(ctx.LoopMode)); err != nil {
            glog.Fatalf("error starting controllers: %v", err)
        }

        // 启动Informer工厂实例,在这里会启动各种类型的通知实例,新的资源变更,会源源不断的作为事件传输过来。
        ctx.InformerFactory.Start(ctx.Stop)
        close(ctx.InformersStarted) // 这时候可以安全启动Informers,然而,Informers其实已经在InformerFactory中启动了。????

        select {}
    }

    if !s.LeaderElection.LeaderElect {
        run(wait.NeverStop)
        panic("unreachable")
    }

    id, err := os.Hostname()
    if err != nil {
        return err
    }
    // add a uniquifier so that two processes on the same host don't accidentally both become active
    id = id + "_" + string(uuid.NewUUID())
    // 创建资源锁,资源锁有两种:configmap和endpoints
    rl, err := resourcelock.New(s.LeaderElection.ResourceLock,
        "kube-system",
        "kube-controller-manager",
        leaderElectionClient.CoreV1(),
        resourcelock.ResourceLockConfig{
            Identity:      id,
            EventRecorder: recorder,
        })
    if err != nil {
        glog.Fatalf("error creating lock: %v", err)
    }
    // 启动leader election
    leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
        Lock:          rl,
        LeaseDuration: s.LeaderElection.LeaseDuration.Duration,
        RenewDeadline: s.LeaderElection.RenewDeadline.Duration,
        RetryPeriod:   s.LeaderElection.RetryPeriod.Duration,
        Callbacks: leaderelection.LeaderCallbacks{
            OnStartedLeading: run,  // 成为leader后运行
            OnStoppedLeading: func() {
                glog.Fatalf("leaderelection lost")
            },
        },
    })
    panic("unreachable")
}

代码的逻辑比较清晰就不做过多的说明了,它主要的工作是启动各个控制器,其中ServiceAccount需要优先启动,包括对应的资源ServiceAccounts和Secrets的Informer也会优先启动,接着会陆续启动其他资源的控制器。

ServiceAccount控制器

ServiceAccountToken控制器是第一个启动的控制器,具体的结构名为:TokensController,该控制器有点特殊,它必须要优先启动,为其他controller设置许可权。并且它不能使用普通的client builder,只能使用root client builder,所以它也不能包含在普通的初始化,必须优先启动。

ServiceAccountToken控制器的主要功能:
捕捉两种资源:ServiceAccount和Secrets的变化事件,基于事件中的对象,进行ServiceAccount与Token的关联检查,进行不要的ServiceAccount与Secrets的关联处理和缓存中非法数据的删除操作。

下面是启动ServiceAccountToken控制器的代码:

func (c serviceAccountTokenControllerStarter) startServiceAccountTokenController(ctx ControllerContext) (bool, error) {
    if !ctx.IsControllerEnabled(saTokenControllerName) {
        glog.Warningf("%q is disabled", saTokenControllerName)
        return false, nil
    }

    if len(ctx.Options.ServiceAccountKeyFile) == 0 {
        glog.Warningf("%q is disabled because there is no private key", saTokenControllerName)
        return false, nil
    }
    privateKey, err := certutil.PrivateKeyFromFile(ctx.Options.ServiceAccountKeyFile)
    if err != nil {
        return true, fmt.Errorf("error reading key for service account token controller: %v", err)
    }

    var rootCA []byte
    if ctx.Options.RootCAFile != "" {
        rootCA, err = ioutil.ReadFile(ctx.Options.RootCAFile)
        if err != nil {
            return true, fmt.Errorf("error reading root-ca-file at %s: %v", ctx.Options.RootCAFile, err)
        }
        if _, err := certutil.ParseCertsPEM(rootCA); err != nil {
            return true, fmt.Errorf("error parsing root-ca-file at %s: %v", ctx.Options.RootCAFile, err)
        }
    } else {
        rootCA = c.rootClientBuilder.ConfigOrDie("tokens-controller").CAData
    }
    // 这里创建了一个TokensController实例
    controller, err := serviceaccountcontroller.NewTokensController(
        ctx.InformerFactory.Core().V1().ServiceAccounts(),  // 注意InformerFactory仍然采用通用的client builder
        ctx.InformerFactory.Core().V1().Secrets(),
        c.rootClientBuilder.ClientOrDie("tokens-controller"),
        serviceaccountcontroller.TokensControllerOptions{
            TokenGenerator: serviceaccount.JWTTokenGenerator(serviceaccount.LegacyIssuer, privateKey),
            RootCA:         rootCA,
        },
    )
    if err != nil {
        return true, fmt.Errorf("error creating Tokens controller: %v", err)
    }
    // 启动控制器
    go controller.Run(int(ctx.Options.ConcurrentSATokenSyncs), ctx.Stop)

    // start the first set of informers now so that other controllers can start
    // 优先启动ServiceAccounts和Secrets
    ctx.InformerFactory.Start(ctx.Stop)

    return true, nil
}

具体代码的注释基本已经说得比较清楚了,核心在于从InformerFoctory中创建ServiceAccounts和Secrets两个SharedIndexerInformer,侦听器负责把事件放到两个不同队列中。控制器的启动会按需启动多个工作协程去处理队列中的数据。
因为有两种数据,所有两个工作队列:syncServiceAccountQueue和syncSecretQueue,负责处理的协程的函数分别是:
TokensController.syncServiceAccount和TokensController.syncSecret。

syncSecret的处理逻辑比较简单,它的功能主要有两个:

  • 对于不存在的secret,删除该secret到ServiceAccount的关联
  • 对于存在的secret,则确保该token(Secret)与ServiceAccount建立了关联,如果没有,则删除该Secret

重点逻辑在于同步ServiceAccount的处理函数上:

func (e *TokensController) syncServiceAccount() {
    key, quit := e.syncServiceAccountQueue.Get()
    if quit {
        return
    }
    defer e.syncServiceAccountQueue.Done(key)

    retry := false
    defer func() {
        e.retryOrForget(e.syncServiceAccountQueue, key, retry)
    }()

    saInfo, err := parseServiceAccountKey(key)
    if err != nil {
        glog.Error(err)
        return
    }

    sa, err := e.getServiceAccount(saInfo.namespace, saInfo.name, saInfo.uid, false)
    switch {
    case err != nil:
        glog.Error(err)
        retry = true
    case sa == nil:
        // service account no longer exists, so delete related tokens
        glog.V(4).Infof("syncServiceAccount(%s/%s), service account deleted, removing tokens", saInfo.namespace, saInfo.name)
        sa = &v1.ServiceAccount{ObjectMeta: metav1.ObjectMeta{Namespace: saInfo.namespace, Name: saInfo.name, UID: saInfo.uid}}
        retry, err = e.deleteTokens(sa)
        if err != nil {
            glog.Errorf("error deleting serviceaccount tokens for %s/%s: %v", saInfo.namespace, saInfo.name, err)
        }
    default:
        // ensure a token exists and is referenced by this service account
        retry, err = e.ensureReferencedToken(sa)
        if err != nil {
            glog.Errorf("error synchronizing serviceaccount %s/%s: %v", saInfo.namespace, saInfo.name, err)
        }
    }
}

主要逻辑也是当ServiceAccount不存在时删除所有关联的token(secrets),否则确保有必要的token与之关联。
ensureReferencedToken完成了这个核心的功能,它首先确定是否有相应的secrets与之关联,如果有直接返回,否则会尝试创建一个新的Secrets来关联。注意:这个操作会最终作用到API Server中。

其他资源控制器的启动

所有控制器的启动都是由下面的代码来启动的:
StartControllers(ctx, saTokenControllerInitFunc, NewControllerInitializers(ctx.LoopMode))
其中saTokenCotnrollerInitFunc特殊,所以专门提炼出来,优先执行,其他的控制器预计对应的初始化函数是有方法NewControllerInitializers来完成。

func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
    controllers := map[string]InitFunc{}
    controllers["endpoint"] = startEndpointController
    controllers["replicationcontroller"] = startReplicationController
    controllers["podgc"] = startPodGCController
    controllers["resourcequota"] = startResourceQuotaController
    controllers["namespace"] = startNamespaceController
    controllers["serviceaccount"] = startServiceAccountController
    controllers["garbagecollector"] = startGarbageCollectorController
    controllers["daemonset"] = startDaemonSetController
    controllers["job"] = startJobController
    controllers["deployment"] = startDeploymentController
    controllers["replicaset"] = startReplicaSetController
    controllers["horizontalpodautoscaling"] = startHPAController
    controllers["disruption"] = startDisruptionController
    controllers["statefulset"] = startStatefulSetController
    controllers["cronjob"] = startCronJobController
    controllers["csrsigning"] = startCSRSigningController
    controllers["csrapproving"] = startCSRApprovingController
    controllers["csrcleaner"] = startCSRCleanerController
    controllers["ttl"] = startTTLController
    controllers["bootstrapsigner"] = startBootstrapSignerController
    controllers["tokencleaner"] = startTokenCleanerController
    if loopMode == IncludeCloudLoops {
        controllers["service"] = startServiceController
        controllers["nodeipam"] = startNodeIpamController
        controllers["route"] = startRouteController
        // TODO: volume controller into the IncludeCloudLoops only set.
        // TODO: Separate cluster in cloud check from node lifecycle controller.
    }
    controllers["nodelifecycle"] = startNodeLifecycleController
    controllers["persistentvolume-binder"] = startPersistentVolumeBinderController
    controllers["attachdetach"] = startAttachDetachController
    controllers["persistentvolume-expander"] = startVolumeExpandController
    controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController
    controllers["pvc-protection"] = startPVCProtectionController
    controllers["pv-protection"] = startPVProtectionController

    return controllers
}

这个方法生成了一个资源名到初始化函数的映射,这里要额外说明的是,对于loopMode==IncludeCloudLoops的循环模型,一般是我们指定CloudProvider=externel的情况下,都是处于这种循环模式,这种情况下,会额外启动service, nodeipam, route这三种资源的控制器,否则,将会在对应的cloud-controller-manager中去处理。

经过上面的初始化后,在StartControllers函数中,会逐个调用启动控制器启动函数,完成各种资源的控制器的启动。

func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc) error {
    // Always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest
    // If this fails, just return here and fail since other controllers won't be able to get credentials.
    if _, err := startSATokenController(ctx); err != nil {
        return err
    }

    // Initialize the cloud provider with a reference to the clientBuilder only after token controller
    // has started in case the cloud provider uses the client builder.
    if ctx.Cloud != nil {
        ctx.Cloud.Initialize(ctx.ClientBuilder)
    }
        // 启动各种资源控制器
    for controllerName, initFn := range controllers {
        if !ctx.IsControllerEnabled(controllerName) {
            glog.Warningf("%q is disabled", controllerName)
            continue
        }

        time.Sleep(wait.Jitter(ctx.Options.ControllerStartInterval.Duration, ControllerStartJitter))

        glog.V(1).Infof("Starting %q", controllerName)
        started, err := initFn(ctx)
        if err != nil {
            glog.Errorf("Error starting %q", controllerName)
            return err
        }
        if !started {
            glog.Warningf("Skipping %q", controllerName)
            continue
        }
        glog.Infof("Started %q", controllerName)
    }

    return nil
}

各种资源的控制器的处理模式与ServiceAccount类似,这里就不再逐一分析了,下次碰到关键的资源类型再针对性的研究代码。



作者:何约什
链接:https://www.jianshu.com/p/f3518d739daf

Top