Controller Runtime 的四种使用姿势

2022/04/16 00:32 上午 posted in  Kubernetes

随着云原生生态的不断发展,目前大多数基于 Kubernetes 的云原生技术,几乎都采用了 CRD + Controller 的模式。即使没有自定义 CRD,也会有需要 Controller 来检测自己感兴趣的资源,在其状态发生变更时,做一些业务所需工作。

controller-runtime 是 Kubernetes 社区提供的相对较好用的能够快速搭建一套对 ApiServer 进行 watch 的工具。本文会对 controller-runtime 的工作原理及不同场景下的使用方法做一个简单的总结和介绍。

架构

controller-runtime 的架构可以用下图概括。注:Webhook 不在本文讨论范围内,故图中舍去了 Webhook。

主要分为用户创建的 Manager 和 Reconciler 以及 Controller Runtime 自己启动的 Cache 和 Controller。先看用户侧的,Manager 是用户初始化的时候需要创建的,用来启动 Controller Runtime 的组件;Reconciler 是用户自己需要提供的组件,用于处理自己的业务逻辑。

而 controller-runtime 侧的组件,Cache 顾名思义就是缓存,用于建立 Informer 对 ApiServer 进行连接 watch 资源,并将 watch 到的 object 推入队列;Controller 一方面会向 Informer 注册 eventHandler,另一方面会从队列中拿数据并执行用户侧 Reconciler 的函数。

controller-runtime 侧整个工作流程如下:

首先 Controller 会先向 Informer 注册特定资源的 eventHandler;然后 Cache 会启动 Informer,Informer 向 ApiServer 发出请求,建立连接;当 Informer 检测到有资源变动后,使用 Controller 注册进来的 eventHandler 判断是否推入队列中;当队列中有元素被推入时,Controller 会将元素取出,并执行用户侧的 Reconciler。

用法

下面介绍几种不同场景下的使用方法。

一般用法

controller-runtime 的用法我们已经很熟悉了,最简单的用法可以用下面的代码表达:

func start() {
	scheme := runtime.NewScheme()
	_ = corev1.AddToScheme(scheme)
	// 1. init Manager
	mgr, _ := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
		Scheme: scheme,
		Port:   9443,
	})
	// 2. init Reconciler(Controller)
	_ = ctrl.NewControllerManagedBy(mgr).
		For(&corev1.Pod{}).
		Complete(&ApplicationReconciler{})

	// 3. start Manager
	if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
	}
}

type ApplicationReconciler struct {
}

func (a ApplicationReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
	return reconcile.Result{}, nil
}

第一步即初始化 Manager,同时生成一个默认配置的 Cache。

第二步是初始化 Controller。

  • ctrl.NewControllerManagedBy:用于创建 Controller,同时将第一步生成的 Manager 的一些配置注入到 Controller 中;
  • For:Controller Runtime 提供的快捷方法,用来指定 watch 的资源类型;
  • Owns:有时候也会用到 Owns 方法,表示某资源是我关心资源的从属,其 event 也会进去 Controller 的队列中;
  • Complete 也是一种快捷方法,用于生成 Controller,将用户的 Reconciler 注册进 Controller,并生成 watch 资源的默认 eventHandler,同时执行 Controller 的 watch 函数;

用户的 Reconciler 只需要实现 reconcile.Reconciler 接口即可。

最后一步就是启动 Manager,这一步中会同时启动 Cache,即启动 Informer,以及启动 Controller。

设置 EventHandler

在整个架构中,Informer 扮演的角色是对 ApiServer 进行 ListWatch,检测到自己感兴趣的资源变化时,会根据注册的 eventHandler 进行处理,并判断是否需要推入队列。

所以,在使用过程中,我们可以在创建 Controller 时,将 Informer 的 eventHandler 函数注册进去,如下:

func start() {
	scheme := runtime.NewScheme()
	_ = corev1.AddToScheme(scheme)
	// 1. init Manager
	mgr, _ := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
		Scheme: scheme,
		Port:   9443,
	})
	// 2. init Reconciler(Controller)
	c, _ := controller.New("app", mgr, controller.Options{Reconciler: &ApplicationReconciler{}})
	_ = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForObject{}, predicate.Funcs{
		CreateFunc: func(event event.CreateEvent) bool {
			...
		},
		UpdateFunc: func(updateEvent event.UpdateEvent) bool {
			...
		},
		DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
			...
		},
	})
	// 3. start Manager
	if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
	}
}

在 predicate 中添加资源入 Queue 前的判断逻辑,可以有效防止队列被推入过多无用的资源。若我们 Reconciler 需要检测多种资源,这里 Controller 可以针对不同的资源类型,分别执行 watch,每次注册不同的 eventHandler。

设置 Cache selector

另外,我们还可以在 Informer 的 ListWatch 函数中添加有效的 LabelSelector 或 FieldSelector,进一步减少检测到的无效资源,在集群资源量大的情况下,也可以起到减少 ApiServer 压力的作用。具体如下:

func start() {
	scheme := runtime.NewScheme()
	_ = corev1.AddToScheme(scheme)
	// 1. init Manager
	mgr, _ := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
		Scheme: scheme,
		Port:   9443,
		NewCache: cache.BuilderWithOptions(cache.Options{
			Scheme: scheme,
			SelectorsByObject: cache.SelectorsByObject{
				&corev1.Pod{}: {
					Label: labels.SelectorFromSet(labels.Set{}),
				},
				&corev1.Node{}: {
					Field: fields.SelectorFromSet(fields.Set{"metadata.name": "node01"}),
				},
			},
		}),
	})
	// 2. init Reconciler(Controller)
	c, _ := controller.New("app", mgr, controller.Options{Reconciler: &ApplicationReconciler{}})
	_ = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForObject{}, predicate.Funcs{
		CreateFunc: func(event event.CreateEvent) bool {
			...
		},
		UpdateFunc: func(updateEvent event.UpdateEvent) bool {
			...
		},
		DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
			...
		},
	})
	// 3. start Manager
	if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
	}
}

这里需要注意的是,controller-runtime 在 v0.11.0 版本中才开放设置 cache selector。

方法是在初始化 Manager 时,使用 cache.BuilderWithOptions 函数,将 LabelSelector 或 FieldSelector 注册进去,同时需要将 scheme 注册进去,以便 cache 生成的 Informer 对 ApiServer 发出请求时,同时给出资源 scheme。

这里可以看下源码,Cache 会生成 3 种 Informer,分别为 structured unstructuredmetadata。启动时也会同时启动这 3 种 Informer。如下:

func NewInformersMap(config *rest.Config,
	scheme *runtime.Scheme,
	mapper meta.RESTMapper,
	resync time.Duration,
	namespace string,
	selectors SelectorsByGVK,
	disableDeepCopy DisableDeepCopyByGVK,
) *InformersMap {
	return &InformersMap{
		structured:   newStructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy),
		unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy),
		metadata:     newMetadataInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy),

		Scheme: scheme,
	}
}

// Start calls Run on each of the informers and sets started to true.  Blocks on the context.
func (m *InformersMap) Start(ctx context.Context) error {
	go m.structured.Start(ctx)
	go m.unstructured.Start(ctx)
	go m.metadata.Start(ctx)
	<-ctx.Done()
	return nil
}

其中,structured 为确定类型的资源,需要在 scheme 中注册对应的资源类型;unstructured 是不确定类型的资源;metadata 则是采用 protobuf 形式请求 ApiServer。

structured 为例:

    func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
    	// Kubernetes APIs work against Resources, not GroupVersionKinds.  Map the
    	// groupVersionKind to the Resource API we will use.
    	mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
    	if err != nil {
    		return nil, err
    	}
    
    	client, err := apiutil.RESTClientForGVK(gvk, false, ip.config, ip.codecs)
    	if err != nil {
    		return nil, err
    	}
    	listGVK := gvk.GroupVersion().WithKind(gvk.Kind + "List")
    	listObj, err := ip.Scheme.New(listGVK)
    	if err != nil {
    		return nil, err
    	}
    
    	// TODO: the functions that make use of this ListWatch should be adapted to
    	//  pass in their own contexts instead of relying on this fixed one here.
    	ctx := context.TODO()
    	// Create a new ListWatch for the obj
    	return &cache.ListWatch{
    		ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
    			ip.selectors(gvk).ApplyToList(&opts)
    			res := listObj.DeepCopyObject()
    			namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors(gvk))
    			isNamespaceScoped := namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
    			err := client.Get().NamespaceIfScoped(namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do(ctx).Into(res)
    			return res, err
    		},
    		// Setup the watch function
    		WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
    			ip.selectors(gvk).ApplyToList(&opts)
    			// Watch needs to be set to true separately
    			opts.Watch = true
    			namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors(gvk))
    			isNamespaceScoped := namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
    			return client.Get().NamespaceIfScoped(namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch(ctx)
    		},
    	}, nil
    }

可以看到,在 Informer 的 ListWatch 接口中,p.selectors(gvk).ApplyToList(&opts) 会将我们一开始注册进来的 selector 添加到后面的 list/watch 请求中。

使用 Metadata

在上面一个例子中,我们提到 metadata 采用 protobuf 序列化形式请求 ApiServer,相比默认的序列化类型 json,protobuf 形式的请求效率更高,在大规模环境中性能更好。不过,不是所有的资源类型都支持 protobuf 格式,比如 CRD 就不支持。

还有一个需要注意的点是,在 Metadata 的数据中,watch 到的数据只有 metadata,没有 spec 和 status。使用示例如下:

func start() {
	scheme := runtime.NewScheme()
	// 1. init Manager
	mgr, _ := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
		Scheme: scheme,
		Port:   9443,
		NewCache: cache.BuilderWithOptions(cache.Options{
			Scheme: scheme,
			SelectorsByObject: cache.SelectorsByObject{
				&corev1.Pod{}: {
					Label: labels.SelectorFromSet(labels.Set{}),
				},
				&corev1.Node{}: {
					Field: fields.SelectorFromSet(fields.Set{"metadata.name": "node01"}),
				},
			},
		}),
	})
	// 2. init Reconciler(Controller)
	c, _ := controller.New("app", mgr, controller.Options{})

	_ = ctrl.NewControllerManagedBy(mgr).
		For(&corev1.Pod{}).
		Complete(&ApplicationReconciler{})

	u := &metav1.PartialObjectMetadata{}
	u.SetGroupVersionKind(schema.GroupVersionKind{
		Kind:    "Pod",
		Group:   "",
		Version: "v1",
	})
	_ = c.Watch(&source.Kind{Type: u}, &handler.EnqueueRequestForObject{}, predicate.Funcs{
		CreateFunc: func(event event.CreateEvent) bool {
			return true
		},
		UpdateFunc: func(updateEvent event.UpdateEvent) bool {
			return true
		},
		DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
			return true
		},
	})
	// 3. start Manager
	if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
	}
}

在 Cache 的 metadata 数据中,采用的数据格式是 meta.v1.PartialObjectMetadata,其使用前提是用户只关心资源的 metadata,对其 spec 及 status 并不关心,所以在对 ApiServer 的 ListWatch 函数中,只获取其 metadata。源码如下:

// PartialObjectMetadata is a generic representation of any object with ObjectMeta. It allows clients
// to get access to a particular ObjectMeta schema without knowing the details of the version.
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type PartialObjectMetadata struct {
	TypeMeta `json:",inline"`
	// Standard object's metadata.
	// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata
	// +optional
	ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
}

func createMetadataListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
	// Kubernetes APIs work against Resources, not GroupVersionKinds.  Map the
	// groupVersionKind to the Resource API we will use.
	mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
	if err != nil {
		return nil, err
	}

	// Always clear the negotiated serializer and use the one
	// set from the metadata client.
	cfg := rest.CopyConfig(ip.config)
	cfg.NegotiatedSerializer = nil

	// grab the metadata client
	client, err := metadata.NewForConfig(cfg)
	if err != nil {
		return nil, err
	}
	ctx := context.TODO()
	// create the relevant listwatch
	return &cache.ListWatch{
		ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
			ip.selectors(gvk).ApplyToList(&opts)

			var (
				list *metav1.PartialObjectMetadataList
				err  error
			)
			namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors(gvk))
			if namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
				list, err = client.Resource(mapping.Resource).Namespace(namespace).List(ctx, opts)
			} else {
				list, err = client.Resource(mapping.Resource).List(ctx, opts)
			}
			if list != nil {
				for i := range list.Items {
					list.Items[i].SetGroupVersionKind(gvk)
				}
			}
			return list, err
		},
		// Setup the watch function
		WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
			ip.selectors(gvk).ApplyToList(&opts)
			// Watch needs to be set to true separately
			opts.Watch = true

			var (
				watcher watch.Interface
				err     error
			)
			namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors(gvk))
			if namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
				watcher, err = client.Resource(mapping.Resource).Namespace(namespace).Watch(ctx, opts)
			} else {
				watcher, err = client.Resource(mapping.Resource).Watch(ctx, opts)
			}
			if watcher != nil {
				watcher = newGVKFixupWatcher(gvk, watcher)
			}
			return watcher, err
		},
	}, nil
}

可以看到,controller-runtime 使用的是 client-go.metadata.Client,这个 Client 的接口返回的数据格式是 PartialObjectMetadata

总结

controller-runtime 是一种很好用的生成资源控制器的工具,在平时的开发过程中,我们可以利用 controller-runtime 快速生成我们需要的资源控制器。同时,controller-runtime 也提供了很多方法,让我们不仅可以快速构建控制器,也可以针对不同的业务需求,进行灵活的配置,达到预期的效果。