随着云原生生态的不断发展,目前大多数基于 Kubernetes 的云原生技术,几乎都采用了 CRD + Controller 的模式。即使没有自定义 CRD,也会有需要 Controller 来检测自己感兴趣的资源,在其状态发生变更时,做一些业务所需工作。
controller-runtime 是 Kubernetes 社区提供的相对较好用的能够快速搭建一套对 ApiServer 进行 watch 的工具。本文会对 controller-runtime 的工作原理及不同场景下的使用方法做一个简单的总结和介绍。
架构
controller-runtime 的架构可以用下图概括。注:Webhook 不在本文讨论范围内,故图中舍去了 Webhook。
主要分为用户创建的 Manager 和 Reconciler 以及 Controller Runtime 自己启动的 Cache 和 Controller。先看用户侧的,Manager 是用户初始化的时候需要创建的,用来启动 Controller Runtime 的组件;Reconciler 是用户自己需要提供的组件,用于处理自己的业务逻辑。
而 controller-runtime 侧的组件,Cache 顾名思义就是缓存,用于建立 Informer 对 ApiServer 进行连接 watch 资源,并将 watch 到的 object 推入队列;Controller 一方面会向 Informer 注册 eventHandler,另一方面会从队列中拿数据并执行用户侧 Reconciler 的函数。
controller-runtime 侧整个工作流程如下:
首先 Controller 会先向 Informer 注册特定资源的 eventHandler;然后 Cache 会启动 Informer,Informer 向 ApiServer 发出请求,建立连接;当 Informer 检测到有资源变动后,使用 Controller 注册进来的 eventHandler 判断是否推入队列中;当队列中有元素被推入时,Controller 会将元素取出,并执行用户侧的 Reconciler。
用法
下面介绍几种不同场景下的使用方法。
一般用法
controller-runtime 的用法我们已经很熟悉了,最简单的用法可以用下面的代码表达:
func start() {
scheme := runtime.NewScheme()
_ = corev1.AddToScheme(scheme)
// 1. init Manager
mgr, _ := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
Port: 9443,
})
// 2. init Reconciler(Controller)
_ = ctrl.NewControllerManagedBy(mgr).
For(&corev1.Pod{}).
Complete(&ApplicationReconciler{})
// 3. start Manager
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
}
}
type ApplicationReconciler struct {
}
func (a ApplicationReconciler) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
return reconcile.Result{}, nil
}
第一步即初始化 Manager,同时生成一个默认配置的 Cache。
第二步是初始化 Controller。
ctrl.NewControllerManagedBy
:用于创建 Controller,同时将第一步生成的 Manager 的一些配置注入到 Controller 中;For
:Controller Runtime 提供的快捷方法,用来指定 watch 的资源类型;Owns
:有时候也会用到Owns
方法,表示某资源是我关心资源的从属,其 event 也会进去 Controller 的队列中;Complete
也是一种快捷方法,用于生成 Controller,将用户的 Reconciler 注册进 Controller,并生成 watch 资源的默认 eventHandler,同时执行 Controller 的watch
函数;
用户的 Reconciler 只需要实现 reconcile.Reconciler
接口即可。
最后一步就是启动 Manager,这一步中会同时启动 Cache,即启动 Informer,以及启动 Controller。
设置 EventHandler
在整个架构中,Informer 扮演的角色是对 ApiServer 进行 ListWatch,检测到自己感兴趣的资源变化时,会根据注册的 eventHandler 进行处理,并判断是否需要推入队列。
所以,在使用过程中,我们可以在创建 Controller 时,将 Informer 的 eventHandler 函数注册进去,如下:
func start() {
scheme := runtime.NewScheme()
_ = corev1.AddToScheme(scheme)
// 1. init Manager
mgr, _ := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
Port: 9443,
})
// 2. init Reconciler(Controller)
c, _ := controller.New("app", mgr, controller.Options{Reconciler: &ApplicationReconciler{}})
_ = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForObject{}, predicate.Funcs{
CreateFunc: func(event event.CreateEvent) bool {
...
},
UpdateFunc: func(updateEvent event.UpdateEvent) bool {
...
},
DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
...
},
})
// 3. start Manager
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
}
}
在 predicate 中添加资源入 Queue 前的判断逻辑,可以有效防止队列被推入过多无用的资源。若我们 Reconciler 需要检测多种资源,这里 Controller 可以针对不同的资源类型,分别执行 watch,每次注册不同的 eventHandler。
设置 Cache selector
另外,我们还可以在 Informer 的 ListWatch 函数中添加有效的 LabelSelector 或 FieldSelector,进一步减少检测到的无效资源,在集群资源量大的情况下,也可以起到减少 ApiServer 压力的作用。具体如下:
func start() {
scheme := runtime.NewScheme()
_ = corev1.AddToScheme(scheme)
// 1. init Manager
mgr, _ := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
Port: 9443,
NewCache: cache.BuilderWithOptions(cache.Options{
Scheme: scheme,
SelectorsByObject: cache.SelectorsByObject{
&corev1.Pod{}: {
Label: labels.SelectorFromSet(labels.Set{}),
},
&corev1.Node{}: {
Field: fields.SelectorFromSet(fields.Set{"metadata.name": "node01"}),
},
},
}),
})
// 2. init Reconciler(Controller)
c, _ := controller.New("app", mgr, controller.Options{Reconciler: &ApplicationReconciler{}})
_ = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForObject{}, predicate.Funcs{
CreateFunc: func(event event.CreateEvent) bool {
...
},
UpdateFunc: func(updateEvent event.UpdateEvent) bool {
...
},
DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
...
},
})
// 3. start Manager
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
}
}
这里需要注意的是,controller-runtime 在 v0.11.0 版本中才开放设置 cache selector。
方法是在初始化 Manager 时,使用 cache.BuilderWithOptions
函数,将 LabelSelector 或 FieldSelector 注册进去,同时需要将 scheme 注册进去,以便 cache 生成的 Informer 对 ApiServer 发出请求时,同时给出资源 scheme。
这里可以看下源码,Cache 会生成 3 种 Informer,分别为 structured
unstructured
及 metadata
。启动时也会同时启动这 3 种 Informer。如下:
func NewInformersMap(config *rest.Config,
scheme *runtime.Scheme,
mapper meta.RESTMapper,
resync time.Duration,
namespace string,
selectors SelectorsByGVK,
disableDeepCopy DisableDeepCopyByGVK,
) *InformersMap {
return &InformersMap{
structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy),
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy),
metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy),
Scheme: scheme,
}
}
// Start calls Run on each of the informers and sets started to true. Blocks on the context.
func (m *InformersMap) Start(ctx context.Context) error {
go m.structured.Start(ctx)
go m.unstructured.Start(ctx)
go m.metadata.Start(ctx)
<-ctx.Done()
return nil
}
其中,structured
为确定类型的资源,需要在 scheme 中注册对应的资源类型;unstructured
是不确定类型的资源;metadata
则是采用 protobuf
形式请求 ApiServer。
以 structured
为例:
func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
// Kubernetes APIs work against Resources, not GroupVersionKinds. Map the
// groupVersionKind to the Resource API we will use.
mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return nil, err
}
client, err := apiutil.RESTClientForGVK(gvk, false, ip.config, ip.codecs)
if err != nil {
return nil, err
}
listGVK := gvk.GroupVersion().WithKind(gvk.Kind + "List")
listObj, err := ip.Scheme.New(listGVK)
if err != nil {
return nil, err
}
// TODO: the functions that make use of this ListWatch should be adapted to
// pass in their own contexts instead of relying on this fixed one here.
ctx := context.TODO()
// Create a new ListWatch for the obj
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
ip.selectors(gvk).ApplyToList(&opts)
res := listObj.DeepCopyObject()
namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors(gvk))
isNamespaceScoped := namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
err := client.Get().NamespaceIfScoped(namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do(ctx).Into(res)
return res, err
},
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
ip.selectors(gvk).ApplyToList(&opts)
// Watch needs to be set to true separately
opts.Watch = true
namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors(gvk))
isNamespaceScoped := namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
return client.Get().NamespaceIfScoped(namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch(ctx)
},
}, nil
}
可以看到,在 Informer 的 ListWatch 接口中,p.selectors(gvk).ApplyToList(&opts)
会将我们一开始注册进来的 selector
添加到后面的 list/watch
请求中。
使用 Metadata
在上面一个例子中,我们提到 metadata
采用 protobuf
序列化形式请求 ApiServer,相比默认的序列化类型 json,protobuf 形式的请求效率更高,在大规模环境中性能更好。不过,不是所有的资源类型都支持 protobuf 格式,比如 CRD 就不支持。
还有一个需要注意的点是,在 Metadata 的数据中,watch 到的数据只有 metadata,没有 spec 和 status。使用示例如下:
func start() {
scheme := runtime.NewScheme()
// 1. init Manager
mgr, _ := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
Port: 9443,
NewCache: cache.BuilderWithOptions(cache.Options{
Scheme: scheme,
SelectorsByObject: cache.SelectorsByObject{
&corev1.Pod{}: {
Label: labels.SelectorFromSet(labels.Set{}),
},
&corev1.Node{}: {
Field: fields.SelectorFromSet(fields.Set{"metadata.name": "node01"}),
},
},
}),
})
// 2. init Reconciler(Controller)
c, _ := controller.New("app", mgr, controller.Options{})
_ = ctrl.NewControllerManagedBy(mgr).
For(&corev1.Pod{}).
Complete(&ApplicationReconciler{})
u := &metav1.PartialObjectMetadata{}
u.SetGroupVersionKind(schema.GroupVersionKind{
Kind: "Pod",
Group: "",
Version: "v1",
})
_ = c.Watch(&source.Kind{Type: u}, &handler.EnqueueRequestForObject{}, predicate.Funcs{
CreateFunc: func(event event.CreateEvent) bool {
return true
},
UpdateFunc: func(updateEvent event.UpdateEvent) bool {
return true
},
DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
return true
},
})
// 3. start Manager
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
}
}
在 Cache 的 metadata 数据中,采用的数据格式是 meta.v1.PartialObjectMetadata
,其使用前提是用户只关心资源的 metadata,对其 spec 及 status 并不关心,所以在对 ApiServer 的 ListWatch 函数中,只获取其 metadata。源码如下:
// PartialObjectMetadata is a generic representation of any object with ObjectMeta. It allows clients
// to get access to a particular ObjectMeta schema without knowing the details of the version.
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type PartialObjectMetadata struct {
TypeMeta `json:",inline"`
// Standard object's metadata.
// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata
// +optional
ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
}
func createMetadataListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
// Kubernetes APIs work against Resources, not GroupVersionKinds. Map the
// groupVersionKind to the Resource API we will use.
mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return nil, err
}
// Always clear the negotiated serializer and use the one
// set from the metadata client.
cfg := rest.CopyConfig(ip.config)
cfg.NegotiatedSerializer = nil
// grab the metadata client
client, err := metadata.NewForConfig(cfg)
if err != nil {
return nil, err
}
ctx := context.TODO()
// create the relevant listwatch
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
ip.selectors(gvk).ApplyToList(&opts)
var (
list *metav1.PartialObjectMetadataList
err error
)
namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors(gvk))
if namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
list, err = client.Resource(mapping.Resource).Namespace(namespace).List(ctx, opts)
} else {
list, err = client.Resource(mapping.Resource).List(ctx, opts)
}
if list != nil {
for i := range list.Items {
list.Items[i].SetGroupVersionKind(gvk)
}
}
return list, err
},
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
ip.selectors(gvk).ApplyToList(&opts)
// Watch needs to be set to true separately
opts.Watch = true
var (
watcher watch.Interface
err error
)
namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors(gvk))
if namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
watcher, err = client.Resource(mapping.Resource).Namespace(namespace).Watch(ctx, opts)
} else {
watcher, err = client.Resource(mapping.Resource).Watch(ctx, opts)
}
if watcher != nil {
watcher = newGVKFixupWatcher(gvk, watcher)
}
return watcher, err
},
}, nil
}
可以看到,controller-runtime 使用的是 client-go.metadata.Client
,这个 Client 的接口返回的数据格式是 PartialObjectMetadata
。
总结
controller-runtime 是一种很好用的生成资源控制器的工具,在平时的开发过程中,我们可以利用 controller-runtime 快速生成我们需要的资源控制器。同时,controller-runtime 也提供了很多方法,让我们不仅可以快速构建控制器,也可以针对不同的业务需求,进行灵活的配置,达到预期的效果。