在 Kubernetes 中,Controller Manager、Scheduler 等组件以及用户实现的 Controller,都是通过多副本的方式来实现高可用。但多副本 Controller 同时工作难免会引发所监听资源的竞争条件,所以通常多副本之间只有一个副本在工作。
为了避免这种竞争条件,Kubernetes 提供了 Leader 选举的模式,多副本之间相互竞争 Leader,只有成为 Leader 才工作,否则一直等待。本文将从 Leader 选举的原理以及作为用户如何使用等方面,介绍如何在 Kubernetes 中实现组件的高可用。
Leader 选举
Leader 选举的原理主要是利用 Lease、ConfigMap、Endpoint 资源实现乐观锁,Lease 资源中定义了 Leader 的 id、抢占时间等信息;ConfigMap 和 Endpoint 在其 annotation 中定义 control-plane.alpha.kubernetes.io/leader
为 leader。是的,没错,如果我们自己实现,随便定义自己的喜欢的字段也行,这里其实是利用了 resourceVersion 来实现的乐观锁。
原理如下图所示,多个副本之间会竞争同一个资源,抢占到了锁就成为 Leader,并定期更新;抢占不到则原地等待,不断尝试抢占。
client-go
中提供了锁的工具方法,k8s 的组件也是直接通过 client-go
来使用的。接下来我们来分析 client-go
提供的工具方法如何实现 Leader 选举。
抢占锁
首先会根据定义的名称获取锁,没有则创建;随后判断当前锁有没有 Leader 以及 Leader 的租期是否到期,没有则抢占锁,否则返回并等待。
其中,抢占锁的过程势必会存在 update 资源的操作,而 k8s 通过版本号的乐观锁实现了 update 操作的原子性。在 update 资源时,ApiServer 会对比 resourceVersion,如果不一致将返回冲突错误。通过这种方式,update 操作的安全性就得到了保证。
抢占锁的代码如下:
func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
now := metav1.Now()
leaderElectionRecord := rl.LeaderElectionRecord{
HolderIdentity: le.config.Lock.Identity(),
LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
RenewTime: now,
AcquireTime: now,
}
// 1. obtain or create the ElectionRecord
oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
if err != nil {
if !errors.IsNotFound(err) {
klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
return false
}
if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil {
klog.Errorf("error initially creating leader election record: %v", err)
return false
}
le.setObservedRecord(&leaderElectionRecord)
return true
}
// 2. Record obtained, check the Identity & Time
if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
le.setObservedRecord(oldLeaderElectionRecord)
le.observedRawRecord = oldLeaderElectionRawRecord
}
if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
!le.IsLeader() {
klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
return false
}
// 3. We're going to try to update. The leaderElectionRecord is set to it's default
// here. Let's correct it before updating.
if le.IsLeader() {
leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
} else {
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
}
// update the lock itself
if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
klog.Errorf("Failed to update lock: %v", err)
return false
}
le.setObservedRecord(&leaderElectionRecord)
return true
}
client-go
仓库提供了一个 example
,我们启动一个进程后,可以看到其 Lease 信息如下:
$ kubectl get lease demo -oyaml
apiVersion: coordination.k8s.io/v1
kind: Lease
metadata:
...
spec:
acquireTime: "2022-07-23T14:28:41.381108Z"
holderIdentity: "1"
leaseDurationSeconds: 60
leaseTransitions: 0
renewTime: "2022-07-23T14:28:41.397199Z"
释放锁
释放锁的逻辑是在 Leader 退出前,也是执行 update 操作,将 Lease 的 leader 信息清空。
func (le *LeaderElector) release() bool {
if !le.IsLeader() {
return true
}
now := metav1.Now()
leaderElectionRecord := rl.LeaderElectionRecord{
LeaderTransitions: le.observedRecord.LeaderTransitions,
LeaseDurationSeconds: 1,
RenewTime: now,
AcquireTime: now,
}
if err := le.config.Lock.Update(context.TODO(), leaderElectionRecord); err != nil {
klog.Errorf("Failed to release lock: %v", err)
return false
}
le.setObservedRecord(&leaderElectionRecord)
return true
}
将上一步启动的进程 kill 后,再看其 Lease 信息:
$ kubectl get lease demo -oyaml
apiVersion: coordination.k8s.io/v1
kind: Lease
metadata:
...
spec:
acquireTime: "2022-07-23T14:29:26.557658Z"
holderIdentity: ""
leaseDurationSeconds: 1
leaseTransitions: 0
renewTime: "2022-07-23T14:29:26.557658Z"
Controller 中如何使用
我们在实现自己的 Controller 的时候,通常是使用 controller runtime 工具,而 controller runtime 早已将 Leader 选举的逻辑做好了封装。
主要逻辑在两处,一是 Lease 基础信息的定义,根据用户的定义补充基础信息,如当前运行的 namespace 作为 leader 的 namespace、根据 host 生成随机的 id 等。
func NewResourceLock(config *rest.Config, recorderProvider recorder.Provider, options Options) (resourcelock.Interface, error) {
if options.LeaderElectionResourceLock == "" {
options.LeaderElectionResourceLock = resourcelock.LeasesResourceLock
}
// LeaderElectionID must be provided to prevent clashes
if options.LeaderElectionID == "" {
return nil, errors.New("LeaderElectionID must be configured")
}
// Default the namespace (if running in cluster)
if options.LeaderElectionNamespace == "" {
var err error
options.LeaderElectionNamespace, err = getInClusterNamespace()
if err != nil {
return nil, fmt.Errorf("unable to find leader election namespace: %w", err)
}
}
// Leader id, needs to be unique
id, err := os.Hostname()
if err != nil {
return nil, err
}
id = id + "_" + string(uuid.NewUUID())
// Construct clients for leader election
rest.AddUserAgent(config, "leader-election")
corev1Client, err := corev1client.NewForConfig(config)
if err != nil {
return nil, err
}
coordinationClient, err := coordinationv1client.NewForConfig(config)
if err != nil {
return nil, err
}
return resourcelock.New(options.LeaderElectionResourceLock,
options.LeaderElectionNamespace,
options.LeaderElectionID,
corev1Client,
coordinationClient,
resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: recorderProvider.GetEventRecorderFor(id),
})
}
二是启动 leader 选举,注册 lock 信息、租期时间、callback 函数等信息,再启动选举进程:
func (cm *controllerManager) startLeaderElection(ctx context.Context) (err error) {
l, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
Lock: cm.resourceLock,
LeaseDuration: cm.leaseDuration,
RenewDeadline: cm.renewDeadline,
RetryPeriod: cm.retryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(_ context.Context) {
if err := cm.startLeaderElectionRunnables(); err != nil {
cm.errChan <- err
return
}
close(cm.elected)
},
OnStoppedLeading: func() {
if cm.onStoppedLeading != nil {
cm.onStoppedLeading()
}
cm.gracefulShutdownTimeout = time.Duration(0)
cm.errChan <- errors.New("leader election lost")
},
},
ReleaseOnCancel: cm.leaderElectionReleaseOnCancel,
})
if err != nil {
return err
}
// Start the leader elector process
go func() {
l.Run(ctx)
<-ctx.Done()
close(cm.leaderElectionStopped)
}()
return nil
}
有了 controller runtime 对选举逻辑的包装,我们在使用的时候,就方便很多。根据 Controller Runtime 的使用姿势 一文的介绍,我们可以在初始化 Controller 的时候,定义 Lease 的信息:
scheme := runtime.NewScheme()
_ = corev1.AddToScheme(scheme)
// 1. init Manager
mgr, _ := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
Port: 9443,
LeaderElection: true,
LeaderElectionID: "demo.xxx",
})
// 2. init Reconciler(Controller)
_ = ctrl.NewControllerManagedBy(mgr).
For(&corev1.Pod{}).
Complete(&ApplicationReconciler{})
...
只要在初始化时,加入 LeaderElection: true
,以及 LeaderElectionID
,即 Lease 的 name,保证集群内唯一即可。其他的信息 controller runtime 都会帮你填充。
总结
在生产环境中,高可用是一个很重要的功能,没有高可用的服务没人敢上生产。Kubernetes 基于 etcd 的 modifiedindex 实现了 resourceVersion 的乐观锁,通过这个乐观锁,Leader 选举机制才能够被多副本使用,避免竞争条件。我们在实现自己的 Controller 的时候只需要巧妙利用这一机制,就可以轻松实现高可用。