Kubernetes container-native workflow engine: Argo

本文基于 Argo v2.7.6,Kubernetes v1.18

Argo 是一个基于 Kubernetes CRD 实现的工作流引擎,为 Kubernetes 提供了 container-native 工作流,即每个工作流节点都是以容器为单位,跑一个任务。

安装

既然是基于 Kubernetes CRD 实现的,那它的组成也就是 CRD + Controller。安装 Argo 很简单,只需要创建一个 argo namespace 和 apply 一个官方提供的 yaml 即可:

kubectl create namespace argo
kubectl apply -n argo -f https://raw.githubusercontent.com/argoproj/argo/stable/manifests/install.yaml

安装好后可以看到 K8s 集群中安装了几个 CRD 和一个 Controller 及其 RBAC 资源。

除了使用 kubectl 来使用 argo workflow 之外,还可以使用 argo cli。在 Mac 环境下安装:

brew install argoproj/tap/argo

在 Linux 环境下安装:

# Download the binary
curl -sLO https://github.com/argoproj/argo/releases/download/v2.7.6/argo-linux-amd64

# Make binary executable
chmod +x argo-linux-amd64

# Move binary to path
mv ./argo-linux-amd64 /usr/local/bin/argo

# Test installation
argo version

工作原理

然后我们先来看 Argo 的工作原理。

架构

Argo 官网提供了一个 v2.5 版本之后的 Argo 架构图,如下:

argo server 向 argo cli 提供 api 服务,当接收到 argo cli 的请求, argo server 会调用 K8s Api 来操作资源;而 Controller 来响应 CRD 的资源变化。

argo server

argo server 分 Hosted 和 Local 两种模式。二者区别在于 Hosted 模式的 server 运行在 K8s 集群内部,而 Local 模式的 server 运行在 K8s 集群外部(大多数情况是运行在本地)。

本地启动 argo server:

$ argo server
INFO[0000]                                               authMode=server baseHRef=/ managedNamespace= namespace=default
INFO[0000] config map                                    name=workflow-controller-configmap
INFO[0000] Starting Argo Server                          version=v2.7.6+70facdb.dirty
INFO[0000] Argo Server started successfully on address :2746

启动完毕后访问本地的 2746 端口,可以看到一个 Argo server 提供的 ui 界面:

Workflow

Argo 最重要的一个 CRD 就是 Workflow,先看下其 Spec 定义:

type WorkflowSpec struct {
    // Templates is a list of workflow templates used in a workflow
    // +patchStrategy=merge
    // +patchMergeKey=name
    Templates []Template `json:"templates" patchStrategy:"merge" patchMergeKey:"name" protobuf:"bytes,1,opt,name=templates"`

    // Entrypoint is a template reference to the starting point of the workflow.
    Entrypoint string `json:"entrypoint,omitempty" protobuf:"bytes,2,opt,name=entrypoint"`

    // Arguments contain the parameters and artifacts sent to the workflow entrypoint
    // Parameters are referencable globally using the 'workflow' variable prefix.
    // e.g. {{workflow.parameters.myparam}}
    Arguments Arguments `json:"arguments,omitempty" protobuf:"bytes,3,opt,name=arguments"`

    ...
}

其中最重要的三个参数:
Templates:所有的工作流模板定义;
Entrypoint:工作流模板入口,也就是指定一个模板 name,指明该 workflow 从哪个模板开始;
Arguments:可以定义 workflow 的入参和制品信息,如果参数使用了 'workflow' 的前缀,表示为整个 workflow 全局可以用。

Template

再来看下 Template 的定义:

// Template is a reusable and composable unit of execution in a workflow
type Template struct {
    // Name is the name of the template
    Name string `json:"name" protobuf:"bytes,1,opt,name=name"`

    // Inputs describe what inputs parameters and artifacts are supplied to this template
    Inputs Inputs `json:"inputs,omitempty" protobuf:"bytes,5,opt,name=inputs"`

    // Outputs describe the parameters and artifacts that this template produces
    Outputs Outputs `json:"outputs,omitempty" protobuf:"bytes,6,opt,name=outputs"`

    // Metdata sets the pods's metadata, i.e. annotations and labels
    Metadata Metadata `json:"metadata,omitempty" protobuf:"bytes,9,opt,name=metadata"`

    // Steps define a series of sequential/parallel workflow steps
    Steps []ParallelSteps `json:"steps,omitempty" protobuf:"bytes,11,opt,name=steps"`

    // Container is the main container image to run in the pod
    Container *apiv1.Container `json:"container,omitempty" protobuf:"bytes,12,opt,name=container"`

    ...
}

可以看到,Template 在整个 workflow 中是可以被复用的。几个重要的参数有输入输出 Inputs、Outputs;该模板运行的 pod 的元数据 Metadata;模板运行的 Container 的定义;模板中定义的并行 Steps 等。

一个 workflow 可以定义一个入口模板(Entrypoint 指定)。其中 Template 和 Step 的关系如下图所示,而模板中可以定义多个并行的 Step,从而构成一个工作流;而 Step 中又可以引用其他模板,每个模板运行一个 pod 进行工作。

Step

再来看下 Step 的定义:

type ParallelSteps struct {
    Steps []WorkflowStep `protobuf:"bytes,1,rep,name=steps"`
}

// WorkflowStep is a reference to a template to execute in a series of step
type WorkflowStep struct {
    // Name of the step
    Name string `json:"name,omitempty" protobuf:"bytes,1,opt,name=name"`

    // Template is the name of the template to execute as the step
    Template string `json:"template,omitempty" protobuf:"bytes,2,opt,name=template"`

    // Arguments hold arguments to the template
    Arguments Arguments `json:"arguments,omitempty" protobuf:"bytes,3,opt,name=arguments"`

    // WithItems expands a step into multiple parallel steps from the items in the list
    WithItems []Item `json:"withItems,omitempty" protobuf:"bytes,5,rep,name=withItems"`

    ...
}

ParallelSteps 中包含了一组 WorkflowStep,形成并行的 Step;WorkflowStep 引用一个模板,模板中可以再指定 ParallelSteps,有了这种串并行的 Step,Argo 就实现了任意形式的工作流的自定义功能。

Step 中的 Arguments 也是可以设置该 Step 中的入参及制品信息;WithItems 则将一个 Step 扩展到多个并行 Step。

上手使用

了解了 Argo 的工作原理之后,我们来实践一下,先使用一个 Argo 官网提供的一个例子:

argo submit --watch https://raw.githubusercontent.com/argoproj/argo/master/examples/loops-maps.yaml

先看下 Workflow 的 yaml 定义:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: loops-maps-
spec:
  entrypoint: loop-map-example
  templates:
  - name: loop-map-example
    steps:
    - - name: test-linux
        template: cat-os-release
        arguments:
          parameters:
          - name: image
            value: "{{item.image}}"
          - name: tag
            value: "{{item.tag}}"
        withItems:
        - { image: 'debian', tag: '9.1' }
        - { image: 'debian', tag: '8.9' }
        - { image: 'alpine', tag: '3.6' }
        - { image: 'ubuntu', tag: '17.10' }

  - name: cat-os-release
    inputs:
      parameters:
      - name: image
      - name: tag
    container:
      image: "{{inputs.parameters.image}}:{{inputs.parameters.tag}}"
      command: [cat]
      args: [/etc/os-release]

Worflow 的入口模板为 loop-map-example;该模板定义了四个并行执行的 Step,其执行参数是分别输入的。

由于打开了 watch 接口,我们可以在终端看到该 workflow 的运行情况:

Name:                loops-maps-qzv56
Namespace:           default
ServiceAccount:      default
Status:              Succeeded
Created:             Tue May 05 03:42:58 +0800 (4 seconds ago)
Started:             Tue May 05 03:42:58 +0800 (4 seconds ago)
Finished:            Tue May 05 03:43:02 +0800 (now)
Duration:            4 seconds

STEP                                         TEMPLATE          PODNAME                      DURATION  MESSAGE
 ✔ loops-maps-qzv56                          loop-map-example
 └-·-✔ test-linux(0:image:debian,tag:9.1)    cat-os-release    loops-maps-qzv56-2106556403  2s
   ├-✔ test-linux(1:image:debian,tag:8.9)    cat-os-release    loops-maps-qzv56-2252793177  3s
   ├-✔ test-linux(2:image:alpine,tag:3.6)    cat-os-release    loops-maps-qzv56-3723288758  3s
   └-✔ test-linux(3:image:ubuntu,tag:17.10)  cat-os-release    loops-maps-qzv56-3710784351  2s

在 Argo UI 中可以直观的看到 workflow 的运行情况:

2020/05/02 16:37 下午 posted in  Kubernetes

Spring 注解的实现

在 Java 中挣扎了一个月,每天都充满了 “我怎么这么菜” 的无力感。本着输出倒逼输入的心态,整理第一篇 Java 文章,愿自己能真心说一句 Java 真香。

自定义注解

注解本身只是元数据,描述了数据的数据,描述的对象可以是类、方法、属性、参数或构造器等。而自定义注解的实现是借助了接口的能力,通常有两类实现方式:BeanPostProcessor 和 BeanFactoryPostProcessor。

BeanPostProcessor

作用范围在属性,在类的属性上打上自定义注解,可以扩展类的能力。实现方法如下:

定义自定义注解

@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface InjectSideCar {
}

完成 BeanPostProcessor 的实现

@Component
public class InjectSidecarAnnotationBeanPostProcessor implements BeanPostProcessor {

    private Class<? extends Annotation> changeAnnotationType;

    @Autowired
    private SideCar sideCar;

    public InjectSidecarAnnotationBeanPostProcessor() {
        this.changeAnnotationType = InjectSideCar.class;
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        ReflectionUtils.doWithFields(bean.getClass(), field -> {
            ReflectionUtils.makeAccessible(field);
            if (field.isAnnotationPresent(changeAnnotationType)) {
                field.set(bean, sideCar);
            }
        });

        return bean;
    }
}

使用自定义注解

@Component
public class Demo {
    @InjectSideCar
    private SideCar sideCar;

    public void doSomething() {
        System.out.println(sideCar.doSomething());
    }
}

在测试类中进行验证:

@SpringBootTest
class HdlsApplicationTests {
    @Autowired
    private Demo d;

    @Test
    public void testDemo() {
        d.doSomething();
    }
}

运行结果如下:

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.2.6.RELEASE)

2020-04-21 00:39:06.014  WARN 37818 --- [           main] o.s.boot.StartupInfoLogger               : InetAddress.getLocalHost().getHostName() took 5003 milliseconds to respond. Please verify your network configuration (macOS machines may need to add entries to /etc/hosts).
2020-04-21 00:39:11.020  INFO 37818 --- [           main] com.example.hdls.HdlsApplicationTests    : Starting HdlsApplicationTests on hdls.local with PID 37818 (started by zhuweiwei in /Users/zhuweiwei/Java/hdls)
2020-04-21 00:39:11.021  INFO 37818 --- [           main] com.example.hdls.HdlsApplicationTests    : No active profile set, falling back to default profiles: default
2020-04-21 00:39:11.299  INFO 37818 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'sideCar' of type [com.example.hdls.SideCar] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-04-21 00:39:11.352  INFO 37818 --- [           main] com.example.hdls.HdlsApplicationTests    : Started HdlsApplicationTests in 15.497 seconds (JVM running for 16.32)

I am a sidecar

BeanFactoryPostProcessor

作用范围在类,在类上打上自定义注解,可以实现自动注入 bean。实现方法如下:

自定义注解

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface CustomBean {
}

完成 BeanFactoryPostProcessor 的实现

@Component
public class CustomBeanDefinitionRegistryPostProcessor implements BeanFactoryPostProcessor {
    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException {
        ClassPathBeanDefinitionScanner scanner = new ClassPathBeanDefinitionScanner((BeanDefinitionRegistry) configurableListableBeanFactory);
        scanner.addIncludeFilter(new AnnotationTypeFilter(CustomBean.class));
        scanner.scan("com.example.hdls");
    }
}

使用自定义注解

@CustomBean
public class Demo {

    public void doSomething() {
        System.out.println("I am a custom bean.");
    }
}

在测试类中进行验证:

@SpringBootTest
class HdlsApplicationTests {

    @Autowired
    private Demo d;

    @Test
    void contextLoads() {
    }

    @Test
    public void testDemo() {
        d.doSomething();
    }
}

运行结果如下:

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.2.6.RELEASE)

2020-04-21 00:45:28.288  WARN 37852 --- [           main] o.s.boot.StartupInfoLogger               : InetAddress.getLocalHost().getHostName() took 5001 milliseconds to respond. Please verify your network configuration (macOS machines may need to add entries to /etc/hosts).
2020-04-21 00:45:33.298  INFO 37852 --- [           main] com.example.hdls.HdlsApplicationTests    : Starting HdlsApplicationTests on hdls.local with PID 37852 (started by zhuweiwei in /Users/zhuweiwei/Java/hdls)
2020-04-21 00:45:33.299  INFO 37852 --- [           main] com.example.hdls.HdlsApplicationTests    : No active profile set, falling back to default profiles: default
2020-04-21 00:45:33.566  INFO 37852 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'sideCar' of type [com.example.hdls.SideCar] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-04-21 00:45:33.619  INFO 37852 --- [           main] com.example.hdls.HdlsApplicationTests    : Started HdlsApplicationTests in 15.497 seconds (JVM running for 16.223)

I am a custom bean.
2020/04/07 10:59 上午 posted in  Java

图解 Deployment Controller 工作流程

本文基于对 Kubernetes v1.16 的源码阅读,通过流程图描述 Deployment Controller 的工作流程,但也包含相关代码以供参考

上一篇文章《Kubernetes Controller Manager 工作原理》讲解了 Controller Manager 是怎么管理 Controller 的,我们知道 Controller 只需要实现相关事件 handler,无须再关心上层逻辑。本文将基于这篇文章,介绍 Deployment Controller 在接收到来自 Informer 的事件后,做了哪些工作。

Deployment 与控制器模式

在 K8s 中,pod 是最小的资源单位,而 pod 的副本管理是通过 ReplicaSet(RS) 实现的;而 deployment 实则是基于 RS 做了更上层的工作。

这就是 Kubernetes 的控制器模式,顶层资源通过控制下层资源,来拓展新能力。deployment 并没有直接对 pod 进行管理,是通过管理 rs 来实现对 pod 的副本控制。deployment 通过对 rs 的控制实现了版本管理:每次发布对应一个版本,每个版本有一个 rs,在注解中标识版本号,而 rs 再每次根据 pod template 和副本数运行相应的 pod。deployment 只需要保证任何情况下 rs 的状态都在预期,rs 保证任何情况下 pod 的状态都在预期。

K8s 是怎么管理 Deployment 的

了解了 deployment 这一资源在 K8s 中的定位,我们再来看下这个资源如何达到预期状态。

Kubernetes 的 API 和控制器都是基于水平触发的,可以促进系统的自我修复和周期协调。

水平触发这个概念来自硬件的中断,中断可以是水平触发,也可以是边缘触发:

  • 水平触发 : 系统仅依赖于当前状态。即使系统错过了某个事件(可能因为故障挂掉了),当它恢复时,依然可以通过查看信号的当前状态来做出正确的响应。
  • 边缘触发 : 系统不仅依赖于当前状态,还依赖于过去的状态。如果系统错过了某个事件(“边缘”),则必须重新查看该事件才能恢复系统。

Kubernetes 水平触发的 API 实现方式是:控制器监视资源对象的实际状态,并与对象期望的状态进行对比,然后调整实际状态,使之与期望状态相匹配。

水平触发的 API 也叫声明式 API,而监控 deployment 资源对象并确定符合预期的控制器就是 deployment controller,对应的 rs 的控制器就是 rs controller。

Deployment Controller

架构

首先看看 DeploymentController 在 K8s 中的定义:

type DeploymentController struct {
    rsControl     controller.RSControlInterface
    client        clientset.Interface
    eventRecorder record.EventRecorder

    syncHandler func(dKey string) error
    enqueueDeployment func(deployment *apps.Deployment)

    dLister appslisters.DeploymentLister
    rsLister appslisters.ReplicaSetLister
    podLister corelisters.PodLister

    dListerSynced cache.InformerSynced
    rsListerSynced cache.InformerSynced
    podListerSynced cache.InformerSynced

    queue workqueue.RateLimitingInterface
}

主要包括几大块内容:

  • rsControl 是一个 ReplicaSet Controller 的工具,用来对 rs 进行认领和弃养工作;
  • client 就是与 APIServer 通信的 client;
  • eventRecorder 用来记录事件;

  • syncHandler 用来处理 deployment 的同步工作;

  • enqueueDeployment 是一个将 deployment 入 queue 的方法;

  • dListerrsListerpodLister 分别用来从 shared informer store 中获取 资源的方法;

  • dListerSyncedrsListerSyncedpodListerSynced 分别是用来标识 shared informer store 中是否同步过;

  • queue 就是 workqueue,deployment、replicaSet、pod 发生变化时,都会将对应的 deployment 推入这个 queue,syncHandler() 方法统一从 workqueue 中处理 deployment。

工作流程

接下来看看 deployment controller 的工作流程。

deployment controller 利用了 informer 的工作能力,实现了资源的监听,同时与其他 controller 协同工作。主要使用了三个 shared informer —— deployment informer、rs informer、pod informer。

首先 deployment controller 会向三个 shared informer 中注册钩子函数,三个钩子函数会在相应事件到来时,将相关的 deployment 推进 workqueue 中。

deployment controller 启动时会有一个 worker 来控制 syncHandler 函数,实时地将 workqueue 中的 item 推出,根据 item 来执行任务。主要包括:领养和弃养 rs、向 eventRecorder 分发事件、根据升级策略决定如何处理下级资源。

workqueue

首先看看 workqueue,这其实是用来辅助 informer 对事件进行分发的队列。整个流程可以梳理为下图。

可以看到,workqueue 分为了三个部分,一是一个先入先出的队列,由切片来实现;还有两个是名为 dirty 和 processing 的 map。整个工作流程分为三个动作,分别为 add、get、done。

add 是将消息推入队列。消息时从 informer 过来的,其实过来的消息并不是事件全部,而是资源 key,即 namespace/name,以这种形式通知业务逻辑有资源的变更事件过来了,需要拿着这个 key 去 indexer 中获取具体资源。如上图绿色的过程所示,在消息进行之后,先检查 dirty 中是否存在,若已存在,不做任何处理,表明事件已经在队列中,不需要重复处理;若 dirty 中不存在,则将该 key 存入 dirty 中(将其作为 map 的键,值为空结构体),再推入队列一份。

get 是 handle 函数从队列中获取 key 的过程。将 key 从队列中 pop 出来的同时,会将其放入 processing 中,并删除其在 dirty 的索引。这一步的原因是将 item 放入 processing 中标记其正在被处理,同时从 dirty 中删除则不影响后面的事件入队列。

done 则是 handle 在处理完 key 之后,必须执行的一步,相当于给 workqueue 发一个 ack,表明我已经处理完毕,该动作仅仅将其从 processing 中删除。

有了这个小而美的先入先出的队列,我们就可以避免资源的多个事件发生时,从 indexer 中重复获取资源的事情发生了。下面来梳理 deployment controller 的具体流程。

replicaSet 的认领和弃养过程

在 deployment controller 的工作流程中,可以注意到除了 deployment 的三个钩子函数,还有 rs 和 pod 的钩子函数,在 rs 的三个钩子函数中,涉及到了 deployment 对 rs 的领养和弃养过程。

rs 认亲

首先来看 rs 的认亲过程:

在 rs 的三个钩子函数中,都会涉及到认亲的过程。

当监听到 rs 的变化后,会根据 rs 的 ownerReferences 字段找到对应的 deployment 入 queue;若该字段为空,意味着这是个孤儿 rs,启动 rs 认亲机制。

认亲过程首先是遍历所有的 deployment,判断 deployment 的 selector 是否与当前 rs 的 labels 相匹配,找到所有与之匹配的 deployment。

然后判断总共有多少个 deployment,若为 0 个,就直接返回,没有人愿意认领,什么都不做,认领过程结束;若大于 1 个,抛错出去,因为这是不正常的行为,不允许多个 deployment 同时拥有同一个 rs;若有且仅有一个 deployment 与之匹配,那么就找到了愿意领养的 deployment,将其入 queue。

addReplicaSet()updateReplicaSet() 的认领过程类似,这里只展示 addReplicaSet() 的代码:

func (dc *DeploymentController) addReplicaSet(obj interface{}) {
    rs := obj.(*apps.ReplicaSet)

    if rs.DeletionTimestamp != nil {
        dc.deleteReplicaSet(rs)
        return
    }

    if controllerRef := metav1.GetControllerOf(rs); controllerRef != nil {
        d := dc.resolveControllerRef(rs.Namespace, controllerRef)
        if d == nil {
            return
        }
        klog.V(4).Infof("ReplicaSet %s added.", rs.Name)
        dc.enqueueDeployment(d)
        return
    }

    ds := dc.getDeploymentsForReplicaSet(rs)
    if len(ds) == 0 {
        return
    }
    klog.V(4).Infof("Orphan ReplicaSet %s added.", rs.Name)
    for _, d := range ds {
        dc.enqueueDeployment(d)
    }
}

deployment 领养和弃养

deployment 对 rs 的领养和弃养过程,是发生在从 workqueue 中处理 item 的过程,也是找到当前 deployment 拥有的所有 rs 的过程。

该过程会轮询所有的 rs,如果有 owner 且是当前 deployment,再判断 label 是否满足 deployment 的 selector,满足则列入结果;若不满足,启动弃养机制,仅仅将 rs 的 ownerReferences 删除,使其成为孤儿,不做其他事情。这也是为什么修改了 deployment 的 selector 之后,会多一个 replicas!=0 的 rs 的原因。

如果 rs 没有 owner,是个孤儿,判断 label 是否满足 deployment 的 selector,满足条件,则启动领养机制,将其 ownerReferences 设置为当前 deployment,再列入结果。

下面是整个过程的源码:

func (m *BaseControllerRefManager) ClaimObject(obj metav1.Object, match func(metav1.Object) bool, adopt, release func(metav1.Object) error) (bool, error) {
    controllerRef := metav1.GetControllerOf(obj)
    if controllerRef != nil {
        if controllerRef.UID != m.Controller.GetUID() {
            return false, nil
        }
        if match(obj) {
            return true, nil
        }

        if m.Controller.GetDeletionTimestamp() != nil {
            return false, nil
        }
        if err := release(obj); err != nil {
            if errors.IsNotFound(err) {
                return false, nil
            }
            return false, err
        }
        return false, nil
    }

    if m.Controller.GetDeletionTimestamp() != nil || !match(obj) {
        return false, nil
    }
    if obj.GetDeletionTimestamp() != nil {
        return false, nil
    }

    if err := adopt(obj); err != nil {
        if errors.IsNotFound(err) {
            return false, nil
        }
        return false, err
    }
    return true, nil
}

领养和弃养的函数:

func (m *ReplicaSetControllerRefManager) AdoptReplicaSet(rs *apps.ReplicaSet) error {
    if err := m.CanAdopt(); err != nil {
        return fmt.Errorf("can't adopt ReplicaSet %v/%v (%v): %v", rs.Namespace, rs.Name, rs.UID, err)
    }

    addControllerPatch := fmt.Sprintf(
        `{"metadata":{"ownerReferences":[{"apiVersion":"%s","kind":"%s","name":"%s","uid":"%s","controller":true,"blockOwnerDeletion":true}],"uid":"%s"}}`,
        m.controllerKind.GroupVersion(), m.controllerKind.Kind,
        m.Controller.GetName(), m.Controller.GetUID(), rs.UID)
    return m.rsControl.PatchReplicaSet(rs.Namespace, rs.Name, []byte(addControllerPatch))
}

func (m *ReplicaSetControllerRefManager) ReleaseReplicaSet(replicaSet *apps.ReplicaSet) error {
    klog.V(2).Infof("patching ReplicaSet %s_%s to remove its controllerRef to %s/%s:%s",
        replicaSet.Namespace, replicaSet.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.Controller.GetName())
    deleteOwnerRefPatch := fmt.Sprintf(`{"metadata":{"ownerReferences":[{"$patch":"delete","uid":"%s"}],"uid":"%s"}}`, m.Controller.GetUID(), replicaSet.UID)
    err := m.rsControl.PatchReplicaSet(replicaSet.Namespace, replicaSet.Name, []byte(deleteOwnerRefPatch))
    if err != nil {
        if errors.IsNotFound(err) {
            return nil
        }
        if errors.IsInvalid(err) {
            return nil
        }
    }
    return err
}

rolloutRecreate

如果 deployment 的更新策略是 Recreate,其过程是将旧的 pod 删除,再启动新的 pod。具体过程如下:

首先根据上一步 rs 的领养和弃养过程,获得当前 deployment 的所有 rs,排序找出最新的 rs,将其 pod template 与 deployment 的 pod template 比较,若不一致需要创建新的 rs;

创建新的 rs 的过程为:计算当前 deployment 的 pod template 的 hash 值,将其增加至 rs label 及 selector 中;

对所有旧的 rs 计算出最大的 revision,将其加一,作为新 rs 的 revision,为新的 rs 设置如下注解:

"deployment.kubernetes.io/revision"
"deployment.kubernetes.io/desired-replicas"
"deployment.kubernetes.io/max-replicas"

如果当前 deployment 的 revision 不是最新,将其设为最新;如果需要更新状态,则更新其状态;

将旧的 rs 进行降级,即将其副本数设为 0;

判断当前所有旧的 pod 是否停止,判断条件为 pod 状态为 failed 或 succeed,unknown 或其他所有状态都不是停止状态;若并非所有 pod 都停止了,则退出本次操作,下一个循环再处理;

若所有 pod 都停止了,将新的 rs 进行升级,即将其副本数置为 deployment 的副本数;

最后进行清理工作,比如旧的 rs 数过多时,删除多余的 rs 等。

下面是 rolloutRecreate 的源代码:

func (dc *DeploymentController) rolloutRecreate(d *apps.Deployment, rsList []*apps.ReplicaSet, podMap map[types.UID][]*v1.Pod) error {
    newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false)
    if err != nil {
        return err
    }
    allRSs := append(oldRSs, newRS)
    activeOldRSs := controller.FilterActiveReplicaSets(oldRSs)

    scaledDown, err := dc.scaleDownOldReplicaSetsForRecreate(activeOldRSs, d)
    if err != nil {
        return err
    }
    if scaledDown {
        return dc.syncRolloutStatus(allRSs, newRS, d)
    }

    if oldPodsRunning(newRS, oldRSs, podMap) {
        return dc.syncRolloutStatus(allRSs, newRS, d)
    }

    if newRS == nil {
        newRS, oldRSs, err = dc.getAllReplicaSetsAndSyncRevision(d, rsList, true)
        if err != nil {
            return err
        }
        allRSs = append(oldRSs, newRS)
    }

    if _, err := dc.scaleUpNewReplicaSetForRecreate(newRS, d); err != nil {
        return err
    }

    if util.DeploymentComplete(d, &d.Status) {
        if err := dc.cleanupDeployment(oldRSs, d); err != nil {
            return err
        }
    }

    return dc.syncRolloutStatus(allRSs, newRS, d)
}

rolloutRolling

如果 deployment 的更新策略是 Recreate,其过程是将旧的 pod 删除,再启动新的 pod。具体过程如下:

从上图可以看到,从开始到创建新的 rs 的过程与 rolloutRecreate 过程一致,唯一区别在于,设置新 rs 副本数的过程。在 rolloutRolling 的过程中,新的 rs 的副本数为 deploy.replicas + maxSurge - currentPodCount。代码如下:

func NewRSNewReplicas(deployment *apps.Deployment, allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet) (int32, error) {
    switch deployment.Spec.Strategy.Type {
    case apps.RollingUpdateDeploymentStrategyType:
        // Check if we can scale up.
        maxSurge, err := intstrutil.GetValueFromIntOrPercent(deployment.Spec.Strategy.RollingUpdate.MaxSurge, int(*(deployment.Spec.Replicas)), true)
        if err != nil {
            return 0, err
        }
        // Find the total number of pods
        currentPodCount := GetReplicaCountForReplicaSets(allRSs)
        maxTotalPods := *(deployment.Spec.Replicas) + int32(maxSurge)
        if currentPodCount >= maxTotalPods {
            // Cannot scale up.
            return *(newRS.Spec.Replicas), nil
        }
        // Scale up.
        scaleUpCount := maxTotalPods - currentPodCount
        // Do not exceed the number of desired replicas.
        scaleUpCount = int32(integer.IntMin(int(scaleUpCount), int(*(deployment.Spec.Replicas)-*(newRS.Spec.Replicas))))
        return *(newRS.Spec.Replicas) + scaleUpCount, nil
    case apps.RecreateDeploymentStrategyType:
        return *(deployment.Spec.Replicas), nil
    default:
        return 0, fmt.Errorf("deployment type %v isn't supported", deployment.Spec.Strategy.Type)
    }
}

然后到了增减新旧 rs 副本数的过程。主要为先 scale up 新 rs,再 scale down 旧 rs。scale up 新 rs 的过程与上述一致;scale down 旧 rs 的过程为先计算一个最大 scale down 副本数,若小于 0 则不做任何操作;然后在 scale down 的时候做了一个优化,先 scale down 不正常的 rs,可以保证先删除那些不健康的副本;最后如果还有余额,再 scale down 正常的 rs。

每次 scale down 的副本数为 allAvailablePodCount - minAvailable,即 allAvailablePodCount - (deploy.replicas - maxUnavailable)。代码如下:

func (dc *DeploymentController) scaleDownOldReplicaSetsForRollingUpdate(allRSs []*apps.ReplicaSet, oldRSs []*apps.ReplicaSet, deployment *apps.Deployment) (int32, error) {
    maxUnavailable := deploymentutil.MaxUnavailable(*deployment)

    // Check if we can scale down.
    minAvailable := *(deployment.Spec.Replicas) - maxUnavailable
    // Find the number of available pods.
    availablePodCount := deploymentutil.GetAvailableReplicaCountForReplicaSets(allRSs)
    if availablePodCount <= minAvailable {
        // Cannot scale down.
        return 0, nil
    }
    klog.V(4).Infof("Found %d available pods in deployment %s, scaling down old RSes", availablePodCount, deployment.Name)

    sort.Sort(controller.ReplicaSetsByCreationTimestamp(oldRSs))

    totalScaledDown := int32(0)
    totalScaleDownCount := availablePodCount - minAvailable
    for _, targetRS := range oldRSs {
        if totalScaledDown >= totalScaleDownCount {
            // No further scaling required.
            break
        }
        if *(targetRS.Spec.Replicas) == 0 {
            // cannot scale down this ReplicaSet.
            continue
        }
        // Scale down.
        scaleDownCount := int32(integer.IntMin(int(*(targetRS.Spec.Replicas)), int(totalScaleDownCount-totalScaledDown)))
        newReplicasCount := *(targetRS.Spec.Replicas) - scaleDownCount
        if newReplicasCount > *(targetRS.Spec.Replicas) {
            return 0, fmt.Errorf("when scaling down old RS, got invalid request to scale down %s/%s %d -> %d", targetRS.Namespace, targetRS.Name, *(targetRS.Spec.Replicas), newReplicasCount)
        }
        _, _, err := dc.scaleReplicaSetAndRecordEvent(targetRS, newReplicasCount, deployment)
        if err != nil {
            return totalScaledDown, err
        }

        totalScaledDown += scaleDownCount
    }

    return totalScaledDown, nil
}

总结

因为 Kubernetes 采用声明式 API,因此对于 Controller 来说,所做的事情就是根据当前事件计算一个预期的状态,并判断当前实际的状态是否满足预期的状态,如果不满足,则采取行动使其靠拢。

本文通过对 Deployment Controller 的工作流程进行分析,虽然做的事情比较繁琐,但是其所做的事情都是围绕 “向预期靠拢” 这个目标展开的。

希望这两篇文章能让你一窥豹斑,了解 Kubernetes Controller Manager 大致的工作流程和实现方式。

2019/12/15 14:37 下午 posted in  Kubernetes

Golang 常用并发编程技巧

Golang 是最早将 CSP 原则纳入其核心的语言之一,并将这种并发编程风格引入到大众中。CSP 指的是 Communicating Sequential Processes ,即通信顺序进程,每个指令都需要指定具体是一个输出变量(从一个进程中读取一个变量的情况),还是一个目的地(将输入发送到一个进程的情况)。

Golang 不仅提供了 CSP 样式的并发方式,还支持通过内存访问同步的传统方式,本文对最常用的 Golang 并发编程工具做一个总结。

sync 包

sync 包包含了对低级别内存访问同步最有用的并发原语,是 “内存访问同步” 的最有利工具,也是传统并发模型解决临界区问题的常用工具。

WaitGroup

WaitGroup 是等待一组并发操作完成的方法,包含了三个函数:

func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait()

其中,Add() 用来添加 goroutine 的个数,Done() 是 goroutine 用来表明执行完成并退出,将计数减一,而 Wait() 用来等待所有 goroutine 退出。

用法如下:

func main() {
    wg := sync.WaitGroup{}

    wg.Add(1)
    go func() {
        defer wg.Done()
        fmt.Printf("goroutine 结束\n")
    }()

    wg.Wait()
}

需要注意的是,Add() 方法需要在 goroutine 之前执行。

互斥锁和读写锁

互斥是保护程序中临界区的一种方式。一个互斥锁只能同时被一个 goroutine 锁定,其它 goroutine 将阻塞直到互斥锁被解锁(重新争抢对互斥锁的锁定)。

用法如下:

func main() {
    var lock sync.Mutex
    var count int
    var wg sync.WaitGroup

    wg.Add(1)
    // count 加 1
    go func() {
        defer wg.Done()
        lock.Lock()
        defer lock.Unlock()
        count++
        fmt.Println("count=", count)
    }()

    // count 减 1
    wg.Add(1)
    go func() {
        defer wg.Done()
        lock.Lock()
        defer lock.Unlock()
        count--
        fmt.Println("count=", count)
    }()

    wg.Wait()
    fmt.Println("count=", count)
}

需要注意的是,在 goroutine 里用 defer 来调用 Unlock 是个常见的习惯用法,确保了即使出现了 panic,调用也总是执行,防止出现死锁。

读写锁在概念上跟互斥锁是一样的:保护对内存的访问,读写锁让你对内存有更多的控制。读写锁与互斥锁最大的不同就是可以分别对读、写进行锁定。一般用在大量读操作、少量写操作的情况。

读写锁的 Lock() 和 Unlock() 是对写操作的锁定和解锁;Rlock() 和 RUnlock() 是对读操作的锁定和解锁,需要配对使用。而读锁和写锁的关系:

  1. 同时只能有一个 goroutine 能够获得写锁定。
  2. 同时可以有任意多个 gorouinte 获得读锁定。
  3. 同时只能存在写锁定或读锁定(读和写互斥)。

Channel

Channel 是 CSP 派生的同步原语之一,是 Golang 推崇的 “使用通信来共享内存,而不是通过共享内存来通信” 理念的最有利的工具。

Channel 的基本使用这里不展开讲,但对不同状态下的 Channel 不同操作的结果做一个总结:

操作 Channel 状态 结果
Read nil 阻塞
打开非空 输出值
打开但空 阻塞
关闭 <默认值>, false
只写 编译错误
Write nil 阻塞
打开但填满 阻塞
打开不满 写入值
关闭 panic
只读 编译错误
Close nil panic
打开非空 关闭 Channel; 读取成功,直到 Channel 耗尽,读取产生值的默认值
打开但空 关闭 Channel;读到生产者的默认值
关闭 panic
只读 编译错误

for-select

select 语句是将 Channel 绑定在一起的粘合剂,能够让一个 goroutine 同时等待多个 Channel 达到准备状态。

select 语句是针对 Channel 的操作,语法上看上去与 switch 很像,但不同的是,select 块中的 case 语句没有测试顺序,如果没有满足任何条件,执行也不会失败。用法如下:

var c1, c2 <-chan interface{}
select {
  case <- c2:
    // 某段逻辑
  case <- c2:
    // 某段逻辑
}

上面这个 select 控制结构会等待所有 case 条件语句任意一个的返回,无论哪一个返回都会立刻执行 case 中的代码,不过如果了 select 中的两个 case 同时被触发,就会随机选择一个 case 执行。

for-select 是一个很常见的用法,通常在 “向 Channel 发送迭代变量” 和 “循环等待停止” 两种情况下会用到,用法如下:

向 Channel 发送迭代变量:

func main() {
    c := make(chan int, 3)
    for _, s := range []int{1, 2, 3} {
        select {
        case c <- s:
        }
    }
}

循环等待停止:

// 第一种
for {
  select {
  case <- done:
    return
  default:
    // 进行非抢占式任务
  }
}
// 第二种
for {
  select {
  case <- done:
    return
  default:
  }
  // 进行非抢占式任务
}

第一种是指,当我们输入 select 语句时,如果完成的 Channel 尚未关闭,我们将执行 default 语句;第二种是指,如果已经完成的 Channel 未关闭,我们将退出 select 语句并继续执行 for 循环的其余部分。

done channel

虽然 goroutine 廉价且易于利用,运行时可以将多个 goroutine 复用到任意数量的操作系统线程,但我们需要知道的是 goroutine 是需要消耗资源的,并且是不会被运行时垃圾回收的。如果出现 goroutine 泄露的情况,严重的时候会导致内存利用率的下降。

而 done channel 就是防止 goroutine 泄露的利器。用 done channel 在父子 goroutine 之间建立一个 “信号通道”,父 goroutine 可以将该 channel 传递给子 goroutine ,然后在想要取消子 goroutine 的时候关闭该 channel。用法如下:

func main() {
    doneChan := make(chan interface{})

    go func(done <-chan interface{}) {
       for {
          select {
          case <-done:
            return
          default:
          }
        }
    }(doneChan)

    // 父 goroutine 关闭子 goroutine
    close(doneChan)
}

确保 goroutine 不泄露的方法,就是规定一个约定:如果 goroutine 负责创建 goroutine,它也负责确保它可以停止 goroutine。

Context 包

Context 包是专门用来简化对于处理单个请求的多个 goroutine 之间与请求域的数据、取消信号、截止时间等相关操作,这些操作可能涉及多个 API 调用。Context 包的目的主要有两个:提供一个可以取消你的调用图中分支的 API,提供用于通过呼叫传输请求范围数据的数据包。

如果使用 Context 包,那么位于顶级并发调用下游的每个函数都会将 context 作为其第一个参数。

Context 的类型如下:

type Context interface {
  Deadline() (deadline time.Time, ok bool)
  Done() <-chan struct{}
  Err() error
  Value(key interface{}) interface{}
}

其中,Deadline 函数用于指示在一定时间后 goroutine 是否会被取消;Done 方法返回当我们的函数被抢占时关闭的 Channel;Err 方法返回取消的错误原因,因为什么 Context 被取消;Value 函数返回与此 Context 关联的 key 或 nil。

Context 虽然是个接口,但是我们在使用它的时候并不需要实现,context 包内置的两个方法来创建上下文的实例:

func Background() Context
func TODO() Context

Background 主要用于 main 函数、初始化以及测试代码中,作为Context 这个树结构的最顶层的 Context,不能被取消;TODO,如果我们不知道该使用什么 Context 的时候,可以使用这个,但是实际应用中,暂时还没有使用过这个 TODO。

然后以此作为最顶层的父 Context,衍生出子 Context 启动调用链。而这些 Context 对象形成了一棵树,当父 Context 对象被取消时,它的所有子 Context 都会被取消。context 包还提供了一系列函数用以产生子 Context:

func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
func WithValue(parent Context, key, val interface{}) Context

其中,WithCancel 返回一个新的 Context,在调用返回的 cancel 函数时关闭其 done channel;WithDeadline 返回一个新的 Context,当机器的时钟超过给定的最后期限时,它关闭完成的 channel;WithTimeout 返回一个新的 Context,在给定的超时时间后关闭其完成的 channel;WithValue 生成一个绑定了一个键值对数据的 Context,这个绑定的数据可以通过 Context.Value 方法访问到。

下面来看使用方法:

WithCancel

func main() {
    wg := sync.WaitGroup{}
    ctx, cancel := context.WithCancel(context.Background())

    wg.Add(1)
    go func(ctx context.Context) {
        defer wg.Done()
        for {
            select {
            case <-ctx.Done():
                fmt.Println("Err:", ctx.Err())
                return
            default:
            }
        }
    }(ctx)

    cancel()
    wg.Wait()
}

WithDeadline

func main() {
    d := time.Now().Add(1 * time.Second)
    wg := sync.WaitGroup{}
    ctx, cancel := context.WithDeadline(context.Background(), d)
    defer cancel()

    wg.Add(1)
    go func(ctx context.Context) {
        defer wg.Done()
        for {
            select {
            case <-ctx.Done():
                fmt.Println("Err:", ctx.Err())
                return
            default:
            }
        }
    }(ctx)

    wg.Wait()
}

WithTimeout

func main() {
    wg := sync.WaitGroup{}
    ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
    defer cancel()

    wg.Add(1)
    go func(ctx context.Context) {
        defer wg.Done()
        for {
            select {
            case <-ctx.Done():
                fmt.Println("Err:", ctx.Err())
                return
            default:
            }
        }
    }(ctx)

    wg.Wait()
}

WithValue

func main() {
    wg := sync.WaitGroup{}
    ctx, cancel := context.WithCancel(context.Background())
    valueCtx := context.WithValue(ctx, "key", "add value")

    wg.Add(1)
    go func(ctx context.Context) {
        defer wg.Done()
        for {
            select {
            case <-ctx.Done():
                fmt.Println("Err:", ctx.Err())
                return
            default:
                fmt.Println(ctx.Value("key"))
                time.Sleep(1 * time.Second)
            }
        }
    }(valueCtx)

    time.Sleep(5*time.Second)
    cancel()
    wg.Wait()
}
2019/11/02 14:55 下午 posted in  Golang

使用 Kubebuilder 创建自定义 K8s AdmissionWebhooks

Kubebuilder 除了可以构建 CRD API 及其 Controller 之外,还能构建 AdmissionWebhooks。这篇文章就来详细分析 Kubebuilder 如何构建 AdmissionWebhooks。

K8s 的 AdmissionWebhooks

首先要知道,在 K8s 里 AdmissionWebhooks 是什么,目的是什么。

先说场景,如果我们需要在 pod 创建出来之前,对其进行配置修改或者检查,这部分工作如果放在 ApiServer 里,需要管理员在 ApiServer 中将其编译成二进制文件,如果配置修改想做成自定义的形式会非常麻烦。而 Admission controllers 就是为这种场景而生的工具,以插件的形式附着到 ApiServer 中,AdmissionWebhooks 就是其中一种准入插件。

K8s 的 AdmissionWebhooks 分两种:MutatingAdmissionWebhookValidatingAdmissionWebhook,二者合起来就是一个特殊类型的 admission controllers,一个处理资源更改,一个处理验证。

在之前一篇文章《[译]深入剖析 Kubernetes MutatingAdmissionWebhook》中,给出了非常详细的 MutatingAdmissionWebhook 的教程,其中主要做了三件事情:

  1. MutatingWebhookConfiguration:MutatingAdmissionWebhook 向 ApiServer 注册的配置;
  2. MutatingAdmissionWebhook 本身:一种插件形式的 admission controller,需要向 ApiServer 注册自己;
  3. Webhook Admission Server:一个附着到 k8s ApiServer 的 http server,接收 ApiServer 的请求。

那么用 Kubebuilder 构建 AdmissionWebhooks 的话,Kubebuilder 会为我们自动生成 Webhook Server,并留下几个函数让我们添加自有逻辑。

创建自定义 AdmissionWebhooks

这里使用一个简单的场景做一个演示,我们自定义一个名为 App 资源,当用户创建一个 App 实例时,我们根据用户的描述创建出一个 Deployment。

然后我们添加一个 MutatingAdmissionWebhook,当用户通过 App 创建 Deployment 时,自动添加一个 sidecar 容器到 Pod 中(这里使用 nginx 作为 sidecar)。

本文所用 kubebuilder 版本为 2.0.1,完整的项目代码可见:https://github.com/zwwhdls/KubeAdmissionWebhookDemo

初始化 API 及 Controller

第一步是创建出 CRD 及其 Controller,几行命令就能搞定:

$ export GO111MODULE=on

$ mkdir $GOPATH/src/zww-app
$ cd $GOPATH/src/zww-app
$ kubebuilder init --domain o0w0o.cn --owner "zwwhdls"

$ kubebuilder create api --group app --version v1 --kind App

我这里做的比较简单,AppSpec 只定义了一个 deploy 属性(就是 appsv1.DeploymentSpec),Controller 中会根据 deploy 属性生成对应的 Deployment:

type AppSpec struct {
    // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
    // Important: Run "make" to regenerate code after modifying this file
    Deploy appsv1.DeploymentSpec `json:"deploy,omitempty"`
}

在完善了 AppSpec 和 Controller 的 Reconcile 函数后,使 Kubebuilder 重新生成代码,并将 config/crd 下的 CRD yaml 应用到当前集群:

make
make install

创建 Webhook Server

接下来就是用 Kubebuilder 来生成 Webhooks 了:

kubebuilder create webhook --group app --version v1 --kind App 

在路径 api/v1 下生成了一个名为 app_webhook.go 的文件。可以看到 Kubebuilder 已经帮你定义了两个变量:

var _ webhook.Defaulter = &App{}
var _ webhook.Validator = &App{}

这两个变量分别表示 MutatingWebhookServer 和 ValidatingWebhookServer,在程序启动的时候,这两个 Server 会 run 起来。

对于 MutatingWebhookServer,Kubebuilder 预留了 Default() 函数,让用户来填写自己的逻辑:

// Default implements webhook.Defaulter so a webhook will be registered for the type
func (r *App) Default() {
    applog.Info("default", "name", r.Name)

    // TODO(user): fill in your defaulting logic.
}

对于我们希望 Webhook 在资源发生什么样的变化时触发,可以通过这条注释修改:

// +kubebuilder:webhook:path=/mutate-app-o0w0o-cn-v1-app,mutating=true,failurePolicy=fail,groups=app.o0w0o.cn,resources=apps,verbs=create;update,versions=v1,name=mapp.kb.io

对应的参数为:

  • failurePolicy:表示 ApiServer 无法与 webhook server 通信时的失败策略,取值为 "ignore" 或 "fail";
  • groups:表示这个 webhook 在哪个 Api Group 下会收到请求;
  • mutating:这个参数是个 bool 型,表示是否是 mutating 类型;
  • name:webhook 的名字,需要与 configuration 中对应;
  • path:webhook 的 path;
  • resources:表示这个 webhook 在哪个资源发生变化时会收到请求;
  • verbs:表示这个 webhook 在资源发生哪种变化时会收到请求,取值为 “create“, "update", "delete", "connect", 或 "*" (即所有);
  • versions:表示这个 webhook 在资源的哪个 version 发生变化时会收到请求;

对于 ValidatingWebhookServer,Kubebuilder 的处理与 MutatingWebhookServer 一致,这里不再赘述。

方便起见,我只定义了 MutatingWebhookServer 的 Default 函数,为每个 App 类型资源的 pod 注入一个 nginx sidecar 容器:

func (r *App) Default() {
    applog.Info("default", "name", r.Name)
    var cns []core.Container
    cns = r.Spec.Deploy.Template.Spec.Containers

    container := core.Container{
        Name:  "sidecar-nginx",
        Image: "nginx:1.12.2",
    }

    cns = append(cns, container)
    r.Spec.Deploy.Template.Spec.Containers = cns
}

运行 Webhook Server

本文仅分享本地开发测试的调试方案,线上部署方案请参考官方文档

首先需要将 MutatingWebhookConfiguration 稍作修改,使得 ApiServer 能够与 Webhook Server 通信。具体方法如下:

配置 Server Path

第一步,配置 Server Path;将 service 去掉,换成 url: https://<server_ip>:9443/mutate-app-o0w0o-cn-v1-app ,其中 server_ip 是 Webhook Server 的 ip,如果运行在本地,就是本地的 ip。需要注意的是 url 中的 path 要与 app_webhook.go 中定义的保持一致。

配置证书

第二步,配置 caBundle;由于在 Kube 里,所有与 ApiServer 交互的组件都需要与 ApiServer 进行双向 TLS 认证,我们这里需要先手动签发自签名 CA 证书:

$ openssl genrsa -out ca.key 2048
$ openssl req -x509 -new -nodes -key ca.key -subj "/CN=<server_ip>" -days 10000 -out ca.crt
$ openssl genrsa -out server.key 2048
$ cat << EOF >csr.conf
> [ req ]
> default_bits = 2048
> prompt = no
> default_md = sha256
> req_extensions = req_ext
> distinguished_name = dn
> 
> [ dn ]
> C = <country>
> ST = <state>
> L = <city>
> O = <organization>
> OU = <organization unit>
> CN = <server_ip>
> 
> [ req_ext ]
> subjectAltName = @alt_names
> 
> [ alt_names ]
> IP.1 = <server_ip>
> 
> [ v3_ext ]
> authorityKeyIdentifier=keyid,issuer:always
> basicConstraints=CA:FALSE
> keyUsage=keyEncipherment,dataEncipherment
> extendedKeyUsage=serverAuth,clientAuth
> subjectAltName=@alt_names
> EOF
$ openssl req -new -key server.key -out server.csr -config csr.conf
$ openssl x509 -req -in server.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out server.crt -days 10000 -extensions v3_ext -extfile csr.conf

证书生成后将 server.keyserver.crt 拷贝到 Kubebuilder 设置的 webhook server 的私钥和证书路径下:

webhook server 的私钥路径:$(TMPDIR)/k8s-webhook-server/serving-certs/tls.key
webhook server 的证书路径:$(TMPDIR)/k8s-webhook-server/serving-certs/tls.crt

注:如果 $(TMPDIR) 为空,则默认路径为 "/tmp/k8s-webhook-server/...",但 android 系统默认路径为 "/data/local/tmp/k8s-webhook-server/..."

而 MutatingWebhookConfiguration 中的 caBundle 为 ca.crt 的 base64 编码结果。最终 yaml 结果为:

apiVersion: admissionregistration.k8s.io/v1beta1
kind: MutatingWebhookConfiguration
metadata:
  creationTimestamp: null
  name: mutating-webhook-configuration
webhooks:
- clientConfig:
  caBundle: LS0tLS1CRUdJTiBDRVJ...FLS0tLS0=
  url: https://<server_ip>:9443/mutate-app-o0w0o-cn-v1-app
  failurePolicy: Fail
  name: mapp.kb.io
  rules:
    ...

ValidatingWebhookConfiguration 的修改与 MutatingWebhookConfiguration 类似,只需要注意 server path 与 app_webhook.go 中一致即可。两个配置文件都修改好之后在集群中 apply 一下即可。

运行

最后直接在本地运行 CRD Controller 及 Webhook Server:

make run

验证

简单运行一个 app 试试:

apiVersion: app.o0w0o.cn/v1
kind: App
metadata:
  name: app-sample
spec:
  deploy:
    selector:
      matchLabels:
        app: app-sample
    template:
      metadata:
        name: sample
        labels:
          app: app-sample
      spec:
        containers:
          - name: cn
            image: daocloud.io/library/redis:4.0.14-alpine

查看是否已经注入了 sidecar 容器:

$ kubectl apply -f config/samples/app_v1_app.yaml
$ kubectl get app
NAME         AGE
app-sample   43s
$ kubectl get deploy
NAME                READY   UP-TO-DATE   AVAILABLE   AGE
app-sample-deploy   0/1     1            0           43s
$ kubectl get po
NAME                                 READY   STATUS              RESTARTS   AGE
app-sample-deploy-5b5cfb9c9b-z8jk5   0/2     ContainerCreating   0          43s
2019/10/12 18:17 下午 posted in  Kubernetes