入口与APIServer一样,基于cobra框架,kube-controller-manager的结构如下:
type CMServer struct {
cmoptions.ControllerManagerServer
}
type ControllerManagerServer struct {
componentconfig.KubeControllerManagerConfiguration
Master string
Kubeconfig string
}
一般来说,kube-controller-manager我们都会直接指定Master或者Kubeconfig参数,可以让Master指向127.0.0.1:6443,这样让kube-controller-manager与kube-apiserver运行在一台机器上。
kube-controller-manager --master=127.0.0.1:8080 --root-ca-file=/data/herry2038/ssl/ca.pem --service-account-private-key-file=/data/herry2038/ssl/kubernetes-key.pem --leader-elect=true --logtostderr=false --log-dir=/data/herry2038/log/controller --v=2 --cluster-signing-cert-file=/data/herry2038/ssl/ca.pem --cluster-signing-key-file=/data/herry2038/ssl/ca-key.pem --feature-gates=RotateKubeletServerCertificate=true
kube-controller-manager --use-service-account-credentials=true --kubeconfig=/etc/kubernetes/controller-manager.conf --service-account-private-key-file=/etc/kubernetes/pki/sa.key --cluster-signing-key-file=/etc/kubernetes/pki/ca.key --address=127.0.0.1 --leader-elect=true --controllers=*,bootstrapsigner,tokencleaner --root-ca-file=/etc/kubernetes/pki/ca.crt --cluster-signing-cert-file=/etc/kubernetes/pki/ca.crt --allocate-node-cidrs=true --cluster-cidr=10.244.0.0/16 --node-cidr-mask-size=24
运行主流程比较清晰,代码在k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go中,注意,kube-controller-manager实现了高可用功能,我们可以同时启动多个进程,它们会进行选举一个leader,由leader来执行相关的功能。当现有leader故障的情况会,选举出新的leader执行任务。
func Run(s *options.CMServer) error {
// To help debugging, immediately log version
glog.Infof("Version: %+v", version.Get())
if err := s.Validate(KnownControllers(), ControllersDisabledByDefault.List()); err != nil {
return err
}
// 配置存放到全局配置中,configz实现了一个全局的配置功能,并且可以安装到HTTP服务中对外提供http访问配置的能力
if c, err := configz.New("componentconfig"); err == nil {
c.Set(s.KubeControllerManagerConfiguration)
} else {
glog.Errorf("unable to register configz: %s", err)
}
// 基于配置创建到API Server的连接以及Leader Election
kubeClient, leaderElectionClient, kubeconfig, err := createClients(s)
if err != nil {
return err
}
// 启动HTTP服务
if s.Port >= 0 {
go startHTTP(s) // prof、configz、health、metrics
}
// 创建事件记录器,存储目标:API Server
recorder := createRecorder(kubeClient)
// 注意这里run是一个方法,主要目的是在leader election中,成为leader后才需要运行。follower不需要运行。
run := func(stop <-chan struct{}) {
// 这里用到了ControllerClientBuilder,它是用于创建到API Server的各种连接的构建器。
// 由于在Controller中会有多种到API Server的连接,为了区分它们,就需要基于构建器来完成,
// 在构建时会生成不同的Agent Name,从而能够根据Agent Name进行区分。
rootClientBuilder := controller.SimpleControllerClientBuilder{
ClientConfig: kubeconfig,
}
var clientBuilder controller.ControllerClientBuilder
if s.UseServiceAccountCredentials {
if len(s.ServiceAccountKeyFile) == 0 {
// It's possible another controller process is creating the tokens for us.
// If one isn't, we'll timeout and exit when our client builder is unable to create the tokens.
glog.Warningf("--use-service-account-credentials was specified without providing a --service-account-private-key-file")
}
clientBuilder = controller.SAControllerClientBuilder{
ClientConfig: restclient.AnonymousClientConfig(kubeconfig),
CoreClient: kubeClient.CoreV1(),
AuthenticationClient: kubeClient.AuthenticationV1(),
Namespace: "kube-system",
}
} else {
clientBuilder = rootClientBuilder
}
// rootClientBuilder 用于sharedInformer与token控制器
ctx, err := CreateControllerContext(s, rootClientBuilder, clientBuilder, stop)
if err != nil {
glog.Fatalf("error building controller context: %v", err)
}
// service account token controller有点特殊,它必须要优先启动,为其他controller设置许可权,
// 并且它不能使用普通的client builder,只能使用root client builder。
// 它也不能包含在普通的初始化,必须优先启动。
saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController
// 启动控制器
if err := StartControllers(ctx, saTokenControllerInitFunc, NewControllerInitializers(ctx.LoopMode)); err != nil {
glog.Fatalf("error starting controllers: %v", err)
}
// 启动Informer工厂实例,在这里会启动各种类型的通知实例,新的资源变更,会源源不断的作为事件传输过来。
ctx.InformerFactory.Start(ctx.Stop)
close(ctx.InformersStarted) // 这时候可以安全启动Informers,然而,Informers其实已经在InformerFactory中启动了。????
select {}
}
if !s.LeaderElection.LeaderElect {
run(wait.NeverStop)
panic("unreachable")
}
id, err := os.Hostname()
if err != nil {
return err
}
// add a uniquifier so that two processes on the same host don't accidentally both become active
id = id + "_" + string(uuid.NewUUID())
// 创建资源锁,资源锁有两种:configmap和endpoints
rl, err := resourcelock.New(s.LeaderElection.ResourceLock,
"kube-system",
"kube-controller-manager",
leaderElectionClient.CoreV1(),
resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: recorder,
})
if err != nil {
glog.Fatalf("error creating lock: %v", err)
}
// 启动leader election
leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: s.LeaderElection.LeaseDuration.Duration,
RenewDeadline: s.LeaderElection.RenewDeadline.Duration,
RetryPeriod: s.LeaderElection.RetryPeriod.Duration,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run, // 成为leader后运行
OnStoppedLeading: func() {
glog.Fatalf("leaderelection lost")
},
},
})
panic("unreachable")
}
代码的逻辑比较清晰就不做过多的说明了,它主要的工作是启动各个控制器,其中ServiceAccount需要优先启动,包括对应的资源ServiceAccounts和Secrets的Informer也会优先启动,接着会陆续启动其他资源的控制器。
ServiceAccountToken控制器是第一个启动的控制器,具体的结构名为:TokensController,该控制器有点特殊,它必须要优先启动,为其他controller设置许可权。并且它不能使用普通的client builder,只能使用root client builder,所以它也不能包含在普通的初始化,必须优先启动。
ServiceAccountToken控制器的主要功能:
捕捉两种资源:ServiceAccount和Secrets的变化事件,基于事件中的对象,进行ServiceAccount与Token的关联检查,进行不要的ServiceAccount与Secrets的关联处理和缓存中非法数据的删除操作。
下面是启动ServiceAccountToken控制器的代码:
func (c serviceAccountTokenControllerStarter) startServiceAccountTokenController(ctx ControllerContext) (bool, error) {
if !ctx.IsControllerEnabled(saTokenControllerName) {
glog.Warningf("%q is disabled", saTokenControllerName)
return false, nil
}
if len(ctx.Options.ServiceAccountKeyFile) == 0 {
glog.Warningf("%q is disabled because there is no private key", saTokenControllerName)
return false, nil
}
privateKey, err := certutil.PrivateKeyFromFile(ctx.Options.ServiceAccountKeyFile)
if err != nil {
return true, fmt.Errorf("error reading key for service account token controller: %v", err)
}
var rootCA []byte
if ctx.Options.RootCAFile != "" {
rootCA, err = ioutil.ReadFile(ctx.Options.RootCAFile)
if err != nil {
return true, fmt.Errorf("error reading root-ca-file at %s: %v", ctx.Options.RootCAFile, err)
}
if _, err := certutil.ParseCertsPEM(rootCA); err != nil {
return true, fmt.Errorf("error parsing root-ca-file at %s: %v", ctx.Options.RootCAFile, err)
}
} else {
rootCA = c.rootClientBuilder.ConfigOrDie("tokens-controller").CAData
}
// 这里创建了一个TokensController实例
controller, err := serviceaccountcontroller.NewTokensController(
ctx.InformerFactory.Core().V1().ServiceAccounts(), // 注意InformerFactory仍然采用通用的client builder
ctx.InformerFactory.Core().V1().Secrets(),
c.rootClientBuilder.ClientOrDie("tokens-controller"),
serviceaccountcontroller.TokensControllerOptions{
TokenGenerator: serviceaccount.JWTTokenGenerator(serviceaccount.LegacyIssuer, privateKey),
RootCA: rootCA,
},
)
if err != nil {
return true, fmt.Errorf("error creating Tokens controller: %v", err)
}
// 启动控制器
go controller.Run(int(ctx.Options.ConcurrentSATokenSyncs), ctx.Stop)
// start the first set of informers now so that other controllers can start
// 优先启动ServiceAccounts和Secrets
ctx.InformerFactory.Start(ctx.Stop)
return true, nil
}
具体代码的注释基本已经说得比较清楚了,核心在于从InformerFoctory中创建ServiceAccounts和Secrets两个SharedIndexerInformer,侦听器负责把事件放到两个不同队列中。控制器的启动会按需启动多个工作协程去处理队列中的数据。
因为有两种数据,所有两个工作队列:syncServiceAccountQueue和syncSecretQueue,负责处理的协程的函数分别是:
TokensController.syncServiceAccount和TokensController.syncSecret。
syncSecret的处理逻辑比较简单,它的功能主要有两个:
重点逻辑在于同步ServiceAccount的处理函数上:
func (e *TokensController) syncServiceAccount() {
key, quit := e.syncServiceAccountQueue.Get()
if quit {
return
}
defer e.syncServiceAccountQueue.Done(key)
retry := false
defer func() {
e.retryOrForget(e.syncServiceAccountQueue, key, retry)
}()
saInfo, err := parseServiceAccountKey(key)
if err != nil {
glog.Error(err)
return
}
sa, err := e.getServiceAccount(saInfo.namespace, saInfo.name, saInfo.uid, false)
switch {
case err != nil:
glog.Error(err)
retry = true
case sa == nil:
// service account no longer exists, so delete related tokens
glog.V(4).Infof("syncServiceAccount(%s/%s), service account deleted, removing tokens", saInfo.namespace, saInfo.name)
sa = &v1.ServiceAccount{ObjectMeta: metav1.ObjectMeta{Namespace: saInfo.namespace, Name: saInfo.name, UID: saInfo.uid}}
retry, err = e.deleteTokens(sa)
if err != nil {
glog.Errorf("error deleting serviceaccount tokens for %s/%s: %v", saInfo.namespace, saInfo.name, err)
}
default:
// ensure a token exists and is referenced by this service account
retry, err = e.ensureReferencedToken(sa)
if err != nil {
glog.Errorf("error synchronizing serviceaccount %s/%s: %v", saInfo.namespace, saInfo.name, err)
}
}
}
主要逻辑也是当ServiceAccount不存在时删除所有关联的token(secrets),否则确保有必要的token与之关联。
ensureReferencedToken完成了这个核心的功能,它首先确定是否有相应的secrets与之关联,如果有直接返回,否则会尝试创建一个新的Secrets来关联。注意:这个操作会最终作用到API Server中。
所有控制器的启动都是由下面的代码来启动的:
StartControllers(ctx, saTokenControllerInitFunc, NewControllerInitializers(ctx.LoopMode))
其中saTokenCotnrollerInitFunc特殊,所以专门提炼出来,优先执行,其他的控制器预计对应的初始化函数是有方法NewControllerInitializers来完成。
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
controllers := map[string]InitFunc{}
controllers["endpoint"] = startEndpointController
controllers["replicationcontroller"] = startReplicationController
controllers["podgc"] = startPodGCController
controllers["resourcequota"] = startResourceQuotaController
controllers["namespace"] = startNamespaceController
controllers["serviceaccount"] = startServiceAccountController
controllers["garbagecollector"] = startGarbageCollectorController
controllers["daemonset"] = startDaemonSetController
controllers["job"] = startJobController
controllers["deployment"] = startDeploymentController
controllers["replicaset"] = startReplicaSetController
controllers["horizontalpodautoscaling"] = startHPAController
controllers["disruption"] = startDisruptionController
controllers["statefulset"] = startStatefulSetController
controllers["cronjob"] = startCronJobController
controllers["csrsigning"] = startCSRSigningController
controllers["csrapproving"] = startCSRApprovingController
controllers["csrcleaner"] = startCSRCleanerController
controllers["ttl"] = startTTLController
controllers["bootstrapsigner"] = startBootstrapSignerController
controllers["tokencleaner"] = startTokenCleanerController
if loopMode == IncludeCloudLoops {
controllers["service"] = startServiceController
controllers["nodeipam"] = startNodeIpamController
controllers["route"] = startRouteController
// TODO: volume controller into the IncludeCloudLoops only set.
// TODO: Separate cluster in cloud check from node lifecycle controller.
}
controllers["nodelifecycle"] = startNodeLifecycleController
controllers["persistentvolume-binder"] = startPersistentVolumeBinderController
controllers["attachdetach"] = startAttachDetachController
controllers["persistentvolume-expander"] = startVolumeExpandController
controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController
controllers["pvc-protection"] = startPVCProtectionController
controllers["pv-protection"] = startPVProtectionController
return controllers
}
这个方法生成了一个资源名到初始化函数的映射,这里要额外说明的是,对于loopMode==IncludeCloudLoops的循环模型,一般是我们指定CloudProvider=externel的情况下,都是处于这种循环模式,这种情况下,会额外启动service, nodeipam, route这三种资源的控制器,否则,将会在对应的cloud-controller-manager中去处理。
经过上面的初始化后,在StartControllers函数中,会逐个调用启动控制器启动函数,完成各种资源的控制器的启动。
func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc) error {
// Always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest
// If this fails, just return here and fail since other controllers won't be able to get credentials.
if _, err := startSATokenController(ctx); err != nil {
return err
}
// Initialize the cloud provider with a reference to the clientBuilder only after token controller
// has started in case the cloud provider uses the client builder.
if ctx.Cloud != nil {
ctx.Cloud.Initialize(ctx.ClientBuilder)
}
// 启动各种资源控制器
for controllerName, initFn := range controllers {
if !ctx.IsControllerEnabled(controllerName) {
glog.Warningf("%q is disabled", controllerName)
continue
}
time.Sleep(wait.Jitter(ctx.Options.ControllerStartInterval.Duration, ControllerStartJitter))
glog.V(1).Infof("Starting %q", controllerName)
started, err := initFn(ctx)
if err != nil {
glog.Errorf("Error starting %q", controllerName)
return err
}
if !started {
glog.Warningf("Skipping %q", controllerName)
continue
}
glog.Infof("Started %q", controllerName)
}
return nil
}
各种资源的控制器的处理模式与ServiceAccount类似,这里就不再逐一分析了,下次碰到关键的资源类型再针对性的研究代码。
作者:何约什
链接:https://www.jianshu.com/p/f3518d739daf