[译]深入剖析 Kubernetes MutatingAdmissionWebhook

翻译自 《Diving into Kubernetes MutatingAdmissionWebhook》
原文链接:
https://medium.com/ibm-cloud/diving-into-kubernetes-mutatingadmissionwebhook-6ef3c5695f74

对于在数据持久化之前,拦截到 Kubernetes API server 的请求,Admission controllers 是非常有用的工具。然而,由于其需要由集群管理员在 kube-apiserver 中编译成二进制文件,所以使用起来不是很灵活。从 Kubernetes 1.7 起,引入了 InitializersExternal Admission Webhooks,用以解决这个问题。在 Kubernetes 1.9 中,Initializers 依然停留在 alpha 版本,然而 External Admission Webhooks 被提升到了 beta 版,且被分成了 MutatingAdmissionWebhookValidatingAdmissionWebhook

MutatingAdmissionWebhookValidatingAdmissionWebhook 二者合起来就是一个特殊类型的 admission controllers,一个处理资源更改,一个处理验证。验证是通过匹配到 MutatingWebhookConfiguration 中定义的规则完成的。

在这篇文章中,我会深入剖析 MutatingAdmissionWebhook 的细节,并一步步实现一个可用的 webhook admission server

Webhooks 的好处

Kubernetes 集群管理员可以使用 webhooks 来创建额外的资源更改及验证准入插件,这些准入插件可以通过 apiserver 的准入链来工作,而不需要重新编译 apiserver。这使得开发者可以对于很多动作都可以自定义准入逻辑,比如对任何资源的创建、更新、删除,给开发者提供了很大的自由和灵活度。可以使用的应用数量巨大。一些常见的使用常见包括:

  1. 在创建资源之前做些更改。Istio 是个非常典型的例子,在目标 pods 中注入 Envoy sidecar 容器,来实现流量管理和规则执行。
  2. 自动配置 StorageClass。监听 PersistentVolumeClaim 资源,并按照事先定好的规则自动的为之增添对应的 StorageClass。使用者无需关心 StorageClass 的创建。
  3. 验证复杂的自定义资源。确保只有其被定义后且所有的依赖项都创建好并可用,自定义资源才可以创建。
  4. namespace 的限制。在多租户系统中,避免资源在预先保留的 namespace 中被创建。

除了以上列出来的使用场景,基于 webhooks 还可以创建更多应用。

Webhooks 和 Initializers

基于社区的反馈,以及对 External Admission WebhooksInitializers 的 alpha 版本的使用案例,Kubernetes 社区决定将 webhooks 升级到 beta 版,并且将其分成两种 webhooks(MutatingAdmissionWebhookValidatingAdmissionWebhook)。这些更新使得 webhooks 与其他 admission controllers 保持一致,并且强制 mutate-before-validateInitializers 可以在 Kubernetes 资源创建前更改,从而实现动态准入控制。如果你对 Initializers 不熟悉,可以参考这篇文章

所以,到底 WebhooksInitializers 之间的区别是什么呢?

  1. Webhooks 可以应用于更多操作,包括对于资源 "增删改" 的 "mutate" 和 "admit";然而 Initializers 不可以对 "删" 资源进行 "admit"。
  2. Webhooks 在创建资源前不允许查询;然而 Initializers 可以监听未初始化的资源,通过参数 ?includeUninitialized=true 来实现。
  3. 由于 Initializers 会把 "预创建" 状态也持久化到 etcd,因此会引入高延迟且给 etcd 带来负担,尤其在 apiserver 升级或失败时;然而 Webhooks 消耗的内存和计算资源更少。
  4. WebhooksInitializers 对失败的保障更强大。Webhooks 的配置中可以配置失败策略,用以避免资源在创建的时候被 hang 住。然而 Initializers 在尝试创建资源的时候可能会 block 住所有的资源。

除了上面列举的不同点,Initializer 在较长一段开发时间内还存在很多已知问题,包括配额补充错误等。Webhooks 升级为 beta 版也就预示着在未来 Webhooks 会是开发目标。如果你需要更稳定的操作,我推荐使用 Webhooks

MutatingAdmissionWebhook 如何工作

MutatingAdmissionWebhook 在资源被持久化到 etcd 前,根据规则将其请求拦截,拦截规则定义在 MutatingWebhookConfiguration 中。MutatingAdmissionWebhook 通过对 webhook server 发送准入请求来实现对资源的更改。而 webhook server 只是一个简单的 http 服务器。

下面这幅图详细描述了 MutatingAdmissionWebhook 如何工作:

MutatingAdmissionWebhook 需要三个对象才能运行:

MutatingWebhookConfiguration

MutatingAdmissionWebhook 需要根据 MutatingWebhookConfiguration 向 apiserver 注册。在注册过程中,MutatingAdmissionWebhook 需要说明:

  1. 如何连接 webhook admission server
  2. 如何验证 webhook admission server
  3. webhook admission server 的 URL path;
  4. webhook 需要操作对象满足的规则;
  5. webhook admission server 处理时遇到错误时如何处理。

MutatingAdmissionWebhook 本身

MutatingAdmissionWebhook 是一种插件形式的 admission controller ,且可以配置到 apiserver 中。MutatingAdmissionWebhook 插件可以从 MutatingWebhookConfiguration 中获取所有感兴趣的 admission webhooks

然后 MutatingAdmissionWebhook 监听 apiserver 的请求,拦截满足条件的请求,并并行执行。

Webhook Admission Server

Webhook Admission Server 只是一个附着到 k8s apiserver 的 http server。对于每一个 apiserver 的请求,MutatingAdmissionWebhook 都会发送一个 admissionReview 到相关的 webhook admission serverwebhook admission server 再决定如何更改资源。

MutatingAdmissionWebhook 教程

编写一个完整的 Webhook Admission Server 可能令人生畏。为了方便起见,我们编写一个简单的 Webhook Admission Server 来实现注入 nginx sidecar 容器以及挂载 volume。完整代码在 kube-mutating-webhook-tutorial。这个项目参考了 Kubernetes webhook 示例Istio sidecar 注入实现

在接下来的段落里,我会向你展示如何编写可工作的容器化 webhook admission server,并将其部署到 Kubernetes 集群中。

前置条件

MutatingAdmissionWebhook 要求 Kubernetes 版本为 1.9.0 及以上,其 admissionregistration.k8s.io/v1beta1 API 可用。确保下面的命令:

kubectl api-versions | grep admissionregistration.k8s.io/v1beta1

其输出为:

admissionregistration.k8s.io/v1beta1

另外,MutatingAdmissionWebhookValidatingAdmissionWebhook 准入控制器需要以正确的顺序加入到 kube-apiserveradmission-control 标签中。

编写 Webhook Server

Webhook Admission Server 是一个简单的 http 服务器,遵循 Kubernetes API。我粘贴部分伪代码来描述主逻辑:

sidecarConfig, err := loadConfig(parameters.sidecarCfgFile)
pair, err := tls.LoadX509KeyPair(parameters.certFile, parameters.keyFile)

whsvr := &WebhookServer {
    sidecarConfig:    sidecarConfig,
    server:           &http.Server {
        Addr:        fmt.Sprintf(":%v", 443),
        TLSConfig:   &tls.Config{Certificates: []tls.Certificate{pair}},
    },
}
    
// define http server and server handler
mux := http.NewServeMux()
mux.HandleFunc("/mutate", whsvr.serve)
whsvr.server.Handler = mux

// start webhook server in new rountine
go func() {
    if err := whsvr.server.ListenAndServeTLS("", ""); err != nil {
        glog.Errorf("Filed to listen and serve webhook server: %v", err)
    }
}()

以上代码的详解:

  1. sidecarCfgFile 包含了 sidecar 注入器模板,其在下面的 ConfigMap 中定义;
  2. certFilekeyFile 是秘钥对,会在 webhook server 和 apiserver 之间的 TLS 通信中用到;
  3. 19 行开启了 https server,以监听 443 端口路径为 '/mutate'

接下来我们关注处理函数 serve 的主要逻辑:

// Serve method for webhook server
func (whsvr *WebhookServer) serve(w http.ResponseWriter, r *http.Request) {
    var body []byte
    if r.Body != nil {
        if data, err := ioutil.ReadAll(r.Body); err == nil {
            body = data
        }
    }

    var reviewResponse *v1beta1.AdmissionResponse
    ar := v1beta1.AdmissionReview{}
    deserializer := codecs.UniversalDeserializer()
    if _, _, err := deserializer.Decode(body, nil, &ar); err != nil {
        glog.Error(err)
        reviewResponse = toAdmissionResponse(err)
    } else {
        reviewResponse = mutate(ar)
    }

    response := v1beta1.AdmissionReview{}
    if reviewResponse != nil {
        response.Response = reviewResponse
        response.Response.UID = ar.Request.UID
    }
    // reset the Object and OldObject, they are not needed in a response.
    ar.Request.Object = runtime.RawExtension{}
    ar.Request.OldObject = runtime.RawExtension{}

    resp, err := json.Marshal(response)
    if err != nil {
        glog.Error(err)
    }
    if _, err := w.Write(resp); err != nil {
        glog.Error(err)
    }
}

函数 serve 是一个简单的 http 处理器,参数为 http requestresponse writer

  1. 首先将请求组装为 AdmissionReview,其中包括 objectoldobjectuserInfo ...
  2. 然后触发 Webhook 主函数 mutate 来创建 patch,以实现注入 sidecar 容器及挂载 volume。
  3. 最后,将 admission decision 和额外 patch 组装成响应,并发送回给 apiserver。

对于函数 mutate 的实现,你可以随意发挥。我就以我的实现方式做个例子:

// main mutation process
func (whsvr *WebhookServer) mutate(ar *v1beta1.AdmissionReview) *v1beta1.AdmissionResponse {
    req := ar.Request
    var pod corev1.Pod
    if err := json.Unmarshal(req.Object.Raw, &pod); err != nil {
        glog.Errorf("Could not unmarshal raw object: %v", err)
        return &v1beta1.AdmissionResponse {
            Result: &metav1.Status {
                Message: err.Error(),
            },
        }
    }
    
    // determine whether to perform mutation
    if !mutationRequired(ignoredNamespaces, &pod.ObjectMeta) {
        glog.Infof("Skipping mutation for %s/%s due to policy check", pod.Namespace, pod.Name)
        return &v1beta1.AdmissionResponse {
            Allowed: true, 
        }
    }

    annotations := map[string]string{admissionWebhookAnnotationStatusKey: "injected"}
    patchBytes, err := createPatch(&pod, whsvr.sidecarConfig, annotations)
    
    return &v1beta1.AdmissionResponse {
        Allowed: true,
        Patch:   patchBytes,
        PatchType: func() *v1beta1.PatchType {
            pt := v1beta1.PatchTypeJSONPatch
            return &pt
        }(),
    }
}

从上述代码中可以看出,函数 mutate 请求了 mutationRequired 来决定这个改动是否被允许。对于被允许的请求,函数 mutate 从另一个函数 createPatch 中获取到修改体 'patch'。注意这里函数 mutationRequired 的诡计,我们跳过了带有注解 sidecar-injector-webhook.morven.me/inject: true。这里稍后会在部署 deployment 的时候提到。完整代码请参考这里

编写 Dockerfile 并构建

创建构建脚本:

dep ensure
CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o kube-mutating-webhook-tutorial .
docker build --no-cache -t morvencao/sidecar-injector:v1 .
rm -rf kube-mutating-webhook-tutorial

docker push morvencao/sidecar-injector:v1

以下面为依赖编写 Dockerfile 文件:

FROM alpine:latest

ADD kube-mutating-webhook-tutorial /kube-mutating-webhook-tutorial
ENTRYPOINT ["./kube-mutating-webhook-tutorial"]

在手动构建容器前,你需要 Docker ID 账号并将 image name 和 tag (Dockerfile 和 deployment.yaml 文件中)修改成你自己的,然后执行以下命令:

[root@mstnode kube-mutating-webhook-tutorial]# ./build
Sending build context to Docker daemon  44.89MB
Step 1/3 : FROM alpine:latest
 ---> 3fd9065eaf02
Step 2/3 : ADD kube-mutating-webhook-tutorial /kube-mutating-webhook-tutorial
 ---> 432de60c2b3f
Step 3/3 : ENTRYPOINT ["./kube-mutating-webhook-tutorial"]
 ---> Running in da6e956d1755
Removing intermediate container da6e956d1755
 ---> 619faa936145
Successfully built 619faa936145
Successfully tagged morvencao/sidecar-injector:v1
The push refers to repository [docker.io/morvencao/sidecar-injector]
efd05fe119bb: Pushed
cd7100a72410: Layer already exists
v1: digest: sha256:7a4889928ec5a8bcfb91b610dab812e5228d8dfbd2b540cd7a341c11f24729bf size: 739

编写 Sidecar 注入配置

现在我们来创建一个 Kubernetes ConfigMap,包含需要注入到目标 pod 中的容器和 volume 信息:

apiVersion: v1
kind: ConfigMap
metadata:
  name: sidecar-injector-webhook-configmap
data:
  sidecarconfig.yaml: |
    containers:
      - name: sidecar-nginx
        image: nginx:1.12.2
        imagePullPolicy: IfNotPresent
        ports:
          - containerPort: 80
        volumeMounts:
          - name: nginx-conf
            mountPath: /etc/nginx
    volumes:
      - name: nginx-conf
        configMap:
          name: nginx-configmap

从上面的清单中看,这里需要另一个包含 nginx confConfigMap。完整 yaml 参考 nginxconfigmap.yaml

然后将这两个 ConfigMap 部署到集群中:

[root@mstnode kube-mutating-webhook-tutorial]# kubectl create -f ./deployment/nginxconfigmap.yaml
configmap "nginx-configmap" created
[root@mstnode kube-mutating-webhook-tutorial]# kubectl create -f ./deployment/configmap.yaml
configmap "sidecar-injector-webhook-configmap" created

创建包含秘钥对的 Secret

由于准入控制是一个高安全性操作,所以对外在的 webhook server 提供 TLS 是必须的。作为流程的一部分,我们需要创建由 Kubernetes CA 签名的 TLS 证书,以确保 webhook server 和 apiserver 之间通信的安全性。对于 CSR 创建和批准的完整步骤,请参考 这里

简单起见,我们参考了 Istio 的脚本并创建了一个类似的名为 webhook-create-signed-cert.sh 的脚本,来自动生成证书及秘钥对并将其加入到 secret 中。

#!/bin/bash
while [[ $# -gt 0 ]]; do
    case ${1} in
        --service)
            service="$2"
            shift
            ;;
        --secret)
            secret="$2"
            shift
            ;;
        --namespace)
            namespace="$2"
            shift
            ;;
    esac
    shift
done

[ -z ${service} ] && service=sidecar-injector-webhook-svc
[ -z ${secret} ] && secret=sidecar-injector-webhook-certs
[ -z ${namespace} ] && namespace=default

csrName=${service}.${namespace}
tmpdir=$(mktemp -d)
echo "creating certs in tmpdir ${tmpdir} "

cat <<EOF >> ${tmpdir}/csr.conf
[req]
req_extensions = v3_req
distinguished_name = req_distinguished_name
[req_distinguished_name]
[ v3_req ]
basicConstraints = CA:FALSE
keyUsage = nonRepudiation, digitalSignature, keyEncipherment
extendedKeyUsage = serverAuth
subjectAltName = @alt_names
[alt_names]
DNS.1 = ${service}
DNS.2 = ${service}.${namespace}
DNS.3 = ${service}.${namespace}.svc
EOF

openssl genrsa -out ${tmpdir}/server-key.pem 2048
openssl req -new -key ${tmpdir}/server-key.pem -subj "/CN=${service}.${namespace}.svc" -out ${tmpdir}/server.csr -config ${tmpdir}/csr.conf

# clean-up any previously created CSR for our service. Ignore errors if not present.
kubectl delete csr ${csrName} 2>/dev/null || true

# create  server cert/key CSR and  send to k8s API
cat <<EOF | kubectl create -f -
apiVersion: certificates.k8s.io/v1beta1
kind: CertificateSigningRequest
metadata:
  name: ${csrName}
spec:
  groups:
  - system:authenticated
  request: $(cat ${tmpdir}/server.csr | base64 | tr -d '\n')
  usages:
  - digital signature
  - key encipherment
  - server auth
EOF

# verify CSR has been created
while true; do
    kubectl get csr ${csrName}
    if [ "$?" -eq 0 ]; then
        break
    fi
done

# approve and fetch the signed certificate
kubectl certificate approve ${csrName}
# verify certificate has been signed
for x in $(seq 10); do
    serverCert=$(kubectl get csr ${csrName} -o jsonpath='{.status.certificate}')
    if [[ ${serverCert} != '' ]]; then
        break
    fi
    sleep 1
done
if [[ ${serverCert} == '' ]]; then
    echo "ERROR: After approving csr ${csrName}, the signed certificate did not appear on the resource. Giving up after 10 attempts." >&2
    exit 1
fi
echo ${serverCert} | openssl base64 -d -A -out ${tmpdir}/server-cert.pem


# create the secret with CA cert and server cert/key
kubectl create secret generic ${secret} \
        --from-file=key.pem=${tmpdir}/server-key.pem \
        --from-file=cert.pem=${tmpdir}/server-cert.pem \
        --dry-run -o yaml |
    kubectl -n ${namespace} apply -f -

运行脚本后,包含证书和秘钥对的 secret 就被创建出来了:

[root@mstnode kube-mutating-webhook-tutorial]# ./deployment/webhook-create-signed-cert.sh
creating certs in tmpdir /tmp/tmp.wXZywp0wAF
Generating RSA private key, 2048 bit long modulus
...........................................+++
..........+++
e is 65537 (0x10001)
certificatesigningrequest "sidecar-injector-webhook-svc.default" created
NAME                                   AGE       REQUESTOR                                           CONDITION
sidecar-injector-webhook-svc.default   0s        https://mycluster.icp:9443/oidc/endpoint/OP#admin   Pending
certificatesigningrequest "sidecar-injector-webhook-svc.default" approved
secret "sidecar-injector-webhook-certs" created

创建 Sidecar 注入器的 Deployment 和 Service

deployment 带有一个 pod,其中运行的就是 sidecar-injector 容器。该容器以特殊参数运行:

  1. sidecarCfgFile 指的是 sidecar 注入器的配置文件,挂载自上面创建的 ConfigMap sidecar-injector-webhook-configmap
  2. tlsCertFiletlsKeyFile 是秘钥对,挂载自 Secret injector-webhook-certs
  3. alsologtostderrv=42>&1 是日志参数。
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: sidecar-injector-webhook-deployment
  labels:
    app: sidecar-injector
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: sidecar-injector
    spec:
      containers:
        - name: sidecar-injector
          image: morvencao/sidecar-injector:v1
          imagePullPolicy: IfNotPresent
          args:
            - -sidecarCfgFile=/etc/webhook/config/sidecarconfig.yaml
            - -tlsCertFile=/etc/webhook/certs/cert.pem
            - -tlsKeyFile=/etc/webhook/certs/key.pem
            - -alsologtostderr
            - -v=4
            - 2>&1
          volumeMounts:
            - name: webhook-certs
              mountPath: /etc/webhook/certs
              readOnly: true
            - name: webhook-config
              mountPath: /etc/webhook/config
      volumes:
        - name: webhook-certs
          secret:
            secretName: sidecar-injector-webhook-certs
        - name: webhook-config
          configMap:
            name: sidecar-injector-webhook-configmap

Service 暴露带有 app=sidecar-injector label 的 pod,使之在集群中可访问。这个 Service 会被 MutatingWebhookConfiguration 中定义的 clientConfig 部分访问,默认的端口 spec.ports.port 需要设置为 443。

apiVersion: v1
kind: Service
metadata:
  name: sidecar-injector-webhook-svc
  labels:
    app: sidecar-injector
spec:
  ports:
  - port: 443
    targetPort: 443
  selector:
    app: sidecar-injector

然后将上述 Deployment 和 Service 部署到集群中,并且验证 sidecar 注入器的 webhook server 是否 running:

[root@mstnode kube-mutating-webhook-tutorial]# kubectl create -f ./deployment/deployment.yaml
deployment "sidecar-injector-webhook-deployment" created
[root@mstnode kube-mutating-webhook-tutorial]# kubectl create -f ./deployment/service.yaml
service "sidecar-injector-webhook-svc" created
[root@mstnode kube-mutating-webhook-tutorial]# kubectl get deployment
NAME                                  DESIRED   CURRENT   UP-TO-DATE   AVAILABLE   AGE
sidecar-injector-webhook-deployment   1         1         1            1           2m
[root@mstnode kube-mutating-webhook-tutorial]# kubectl get pod
NAME                                                  READY     STATUS    RESTARTS   AGE
sidecar-injector-webhook-deployment-bbb689d69-fdbgj   1/1       Running   0          3m

动态配置 webhook 准入控制器

MutatingWebhookConfiguration 中具体说明了哪个 webhook admission server 是被使用的并且哪些资源受准入服务器的控制。建议你在创建 MutatingWebhookConfiguration 之前先部署 webhook admission server,并确保其正常工作。否则,请求会被无条件接收或根据失败规则被拒。

现在,我们根据下面的内容创建 MutatingWebhookConfiguration

apiVersion: admissionregistration.k8s.io/v1beta1
kind: MutatingWebhookConfiguration
metadata:
  name: sidecar-injector-webhook-cfg
  labels:
    app: sidecar-injector
webhooks:
  - name: sidecar-injector.morven.me
    clientConfig:
      service:
        name: sidecar-injector-webhook-svc
        namespace: default
        path: "/mutate"
      caBundle: ${CA_BUNDLE}
    rules:
      - operations: [ "CREATE" ]
        apiGroups: [""]
        apiVersions: ["v1"]
        resources: ["pods"]
    namespaceSelector:
      matchLabels:
        sidecar-injector: enabled

第 8 行:name - webhook 的名字,必须指定。多个 webhook 会以提供的顺序排序;
第 9 行:clientConfig - 描述了如何连接到 webhook admission server 以及 TLS 证书;
第 15 行:rules - 描述了 webhook server 处理的资源和操作。在我们的例子中,只拦截创建 pods 的请求;
第 20 行:namespaceSelector - namespaceSelector 根据资源对象是否匹配 selector 决定了是否针对该资源向 webhook server 发送准入请求。

在部署 MutatingWebhookConfiguration 前,我们需要将 ${CA_BUNDLE} 替换成 apiserver 的默认 caBundle。我们写个脚本来自动匹配:

#!/bin/bash
set -o errexit
set -o nounset
set -o pipefail

ROOT=$(cd $(dirname $0)/../../; pwd)

export CA_BUNDLE=$(kubectl get configmap -n kube-system extension-apiserver-authentication -o=jsonpath='{.data.client-ca-file}' | base64 | tr -d '\n')

if command -v envsubst >/dev/null 2>&1; then
    envsubst
else
    sed -e "s|\${CA_BUNDLE}|${CA_BUNDLE}|g"
fi

然后执行:

[root@mstnode kube-mutating-webhook-tutorial]# cat ./deployment/mutatingwebhook.yaml |\
>   ./deployment/webhook-patch-ca-bundle.sh >\
>   ./deployment/mutatingwebhook-ca-bundle.yaml

我们看不到任何日志描述 webhook server 接收到准入请求,似乎该请求并没有发送到 webhook server 一样。所以有一种可能性是这是被 MutatingWebhookConfiguration 中的配置触发的。再确认一下 MutatingWebhookConfiguration 我们会发现下面的内容:

namespaceSelector:
      matchLabels:
        sidecar-injector: enabled

通过 namespaceSelector 控制 sidecar 注入器

我们在 MutatingWebhookConfiguration 中配置了 namespaceSelector,也就意味着只有在满足条件的 namespace 下的资源能够被发送到  webhook server。于是我们将 default 这个 namespace 打上标签 sidecar-injector=enabled

[root@mstnode kube-mutating-webhook-tutorial]# kubectl label namespace default sidecar-injector=enabled
namespace "default" labeled
[root@mstnode kube-mutating-webhook-tutorial]# kubectl get namespace -L sidecar-injector
NAME          STATUS    AGE       sidecar-injector
default       Active    1d        enabled
kube-public   Active    1d
kube-system   Active    1d

现在我们配置的 MutatingWebhookConfiguration 会在 pod 创建的时候就注入 sidecar 容器。将运行中的 pod 删除并确认是否创建了新的带 sidecar 容器的 pod:

[root@mstnode kube-mutating-webhook-tutorial]# kubectl delete pod sleep-6d79d8dc54-r66vz
pod "sleep-6d79d8dc54-r66vz" deleted
[root@mstnode kube-mutating-webhook-tutorial]# kubectl get pods
NAME                                                  READY     STATUS              RESTARTS   AGE
sidecar-injector-webhook-deployment-bbb689d69-fdbgj   1/1       Running             0          29m
sleep-6d79d8dc54-b8ztx                                0/2       ContainerCreating   0          3s
sleep-6d79d8dc54-r66vz                                1/1       Terminating         0          11m
[root@mstnode kube-mutating-webhook-tutorial]# kubectl get pod sleep-6d79d8dc54-b8ztx -o yaml
apiVersion: v1
kind: Pod
metadata:
  annotations:
    kubernetes.io/psp: default
    sidecar-injector-webhook.morven.me/inject: "true"
    sidecar-injector-webhook.morven.me/status: injected
  labels:
    app: sleep
    pod-template-hash: "2835848710"
  name: sleep-6d79d8dc54-b8ztx
  namespace: default
spec:
  containers:
  - command:
    - /bin/sleep
    - infinity
    image: tutum/curl
    imagePullPolicy: IfNotPresent
    name: sleep
    resources: {}
    volumeMounts:
    - mountPath: /var/run/secrets/kubernetes.io/serviceaccount
      name: default-token-d7t2r
      readOnly: true
  - image: nginx:1.12.2
    imagePullPolicy: IfNotPresent
    name: sidecar-nginx
    ports:
    - containerPort: 80
      protocol: TCP
    resources: {}
    terminationMessagePath: /dev/termination-log
    terminationMessagePolicy: File
    volumeMounts:
    - mountPath: /etc/nginx
      name: nginx-conf
  volumes:
  - name: default-token-d7t2r
    secret:
      defaultMode: 420
      secretName: default-token-d7t2r
  - configMap:
      defaultMode: 420
      name: nginx-configmap
    name: nginx-conf
...

可以看到,sidecar 容器和 volume 被成功注入到应用中。至此,我们成功创建了可运行的带 MutatingAdmissionWebhook 的 sidecar 注入器。通过 namespaceSelector,我们可以轻易的控制在特定的 namespace 中的 pods 是否需要被注入 sidecar 容器。

但这里有个问题,根据以上的配置,在 default 这个 namespace 下的所有 pods 都会被注入 sidecar 容器,无一例外。

通过注解控制 sidecar 注入器

多亏了 MutatingAdmissionWebhook 的灵活性,我们可以轻易的自定义变更逻辑来筛选带有特定注解的资源。还记得上面提到的注解 sidecar-injector-webhook.morven.me/inject: "true" 吗?在 sidecar 注入器中这可以当成另一种控制方式。在 webhook server 中我写了一段逻辑来跳过那行不带这个注解的 pod。

我们来尝试一下。在这种情况下,我们创建另一个 sleep 应用,其 podTemplateSpec 中不带注解 sidecar-injector-webhook.morven.me/inject: "true"

[root@mstnode kube-mutating-webhook-tutorial]# kubectl delete deployment sleep
deployment "sleep" deleted
[root@mstnode kube-mutating-webhook-tutorial]# cat <<EOF | kubectl create -f -
apiVersion: extensions/v1beta1
> kind: Deployment
> metadata:
>   name: sleep
> spec:
>   replicas: 1
>   template:
>     metadata:
>       labels:
>         app: sleep
>     spec:
>       containers:
>       - name: sleep
>         image: tutum/curl
>         command: ["/bin/sleep","infinity"]
>         imagePullPolicy: IfNotPresent
> EOF
deployment "sleep" created

然后确认 sidecar 注入器是否跳过了这个 pod:

[root@mstnode kube-mutating-webhook-tutorial]# kubectl get deployment
NAME                                  DESIRED   CURRENT   UP-TO-DATE   AVAILABLE   AGE
sidecar-injector-webhook-deployment   1         1         1            1           45m
sleep                                 1         1         1            1           17s
[root@mstnode kube-mutating-webhook-tutorial]# kubectl get pod
NAME                                                  READY     STATUS        RESTARTS   AGE
sidecar-injector-webhook-deployment-bbb689d69-fdbgj   1/1       Running       0          45m
sleep-776b7bcdcd-4bz58                                1/1       Running       0          21s

结果显示,这个 sleep 应用只包含一个容器,没有额外的容器和 volume 注入。然后我们将这个 deployment 增加注解,并确认其重建后是否被注入 sidecar:

[root@mstnode kube-mutating-webhook-tutorial]# kubectl patch deployment sleep -p '{"spec":{"template":{"metadata":{"annotations":{"sidecar-injector-webhook.morven.me/inject": "true"}}}}}'
deployment "sleep" patched
[root@mstnode kube-mutating-webhook-tutorial]# kubectl delete pod sleep-776b7bcdcd-4bz58
pod "sleep-776b7bcdcd-4bz58" deleted
[root@mstnode kube-mutating-webhook-tutorial]# kubectl get pods
NAME                                                  READY     STATUS              RESTARTS   AGE
sidecar-injector-webhook-deployment-bbb689d69-fdbgj   1/1       Running             0          49m
sleep-3e42ff9e6c-6f87b                                0/2       ContainerCreating   0          18s
sleep-776b7bcdcd-4bz58                                1/1       Terminating         0          3m

与预期一致,pod 被注入了额外的 sidecar 容器。至此,我们就获得了一个可工作的 sidecar 注入器,可由 namespaceSelector 或更细粒度的由注解控制。

总结

MutatingAdmissionWebhook 是 Kubernetes 扩展功能中最简单的方法之一,工作方式是通过全新规则控制、资源更改。

此功能使得更多的工作模式变成了可能,并且支持了更多生态系统,包括服务网格平台 Istio。从 Istio 0.5.0 开始,Istio 的自动注入部分的代码被重构,实现方式从 initializers 变更为 MutatingAdmissionWebhook

参考文献

2019/4/28 posted in  Kubernetes

Alembic 速查笔记

Alembic 命令行

初始化

$ cd yourproject
$ alembic init alembic

创建一次 alembic

alembic revision -m "***"

upgrade / downgrade

alembic upgrade head  # 升级到最新版本
alembic upgrade +2
alembic downgrade -1
alembic downgrade base  # 回退到最开始的版本

获取 alembic 版本信息

alembic history
alembic current
alembic heads
alembic branches

自动生成

alembic revision --autogenerate -m "Added account table"

Alembic 语法

增加表

from alembic import op
import sqlalchemy as sa
def upgrade():
    ### commands auto generated by Alembic - please adjust! ###
    op.create_table(
      'account',
      sa.Column('id', sa.Integer()),
      sa.Column('name', sa.String(length=50), nullable=False),
      sa.Column('description', sa.VARCHAR(200)),
      sa.Column('last_transaction_date', sa.DateTime()),
      sa.PrimaryKeyConstraint('id')
    )

公式:

op.create_table(<表名>, sa.Column(<列名>, *属性))

删除表

def downgrade():
  ### commands auto generated by Alembic - please adjust! ###
  op.drop_table("account")
  ### end Alembic commands ###

公式:

op.drop_table(<表名>)

增加一列

from alembic import op
from sqlalchemy import Column,String
    
op.add_column('organization',
    Column('name',String())
)

公式:

op.add_column(<表名>, Column(<列名>, *属性))

删除一列

op.drop_column('organization','name' )

公式:

op.drop_column(<表名>, <列名>)

修改列属性

op.alter_column('user', 'name', new_column_name='username',
                    existing_type=mysql.VARCHAR(length=20))

公式:

alter_column(<表名>, <旧列名>, new_column_name=<新列名>, existing_type=<旧字段类型>, type_=<字段类型修改后>)

注:当表中有数据时,修改字段类型无效或报错

分批处理

with op.batch_alter_table("some_table") as batch_op:
    batch_op.add_column(Column('foo', Integer))
    batch_op.drop_column('bar')

执行 SQL 语句

sql="""ALTER TABLE actions alter column finished_at type float;"""
conn=op.get_bind()
conn.execute(sql)

插入数据

# 在已有表内插入数据
from alembic import op
from sqlalchemy.sql import table,column
from sqlalchemy import String, Integer, Date
    
#Create an ad-hoc table to use for the insert statement.
accounts_table=table('test',
    column('id',Integer),
    column('name',String),
)
    
op.bulk_insert(accounts_table,
    [
        {'id':1,'name':'JohnSmith'},
        {'id':2,'name':'EdWilliams'},
        {'id':3,'name':'WendyJones'},
    ]
)

更新版本,但不操作实际的 upgrade 内容(慎用)

alembic stamp head 
2019/4/25 posted in  Python

RabbitMQ 模型和死信队列

RabbitMQ 模型

RabbitMQ 是一个生产者/消费者模型,生产者生产消息到队列中,而消费者从队列中拿消息进行消费,两者并不直接交互。

我们首先来看看 RabbitMQ 的模型结构

在图中,我们可以看到,整个结构包括:生产者 Producer、交换机 Exchange、队列 Queue,以及消费者 Consumer。

其中,生产者和消费者与 MQ 连接时会创建 TCP 连接和信道,生产者生产消息,根据其指定的 RoutingKey 已经交换机连接 Queue 的 BindingKey,两者共同决定将消息发送到哪个队列中。

下面我们逐一分析各个部分的功能。

Channel


每个生产者或消费者都需要与 RabbitMQ Broker 建立 TCP 连接,即 Connection。Connection 建立起来之后,客户端会创建一个 AMQP 信道,即 Channel,这是基于 Connection 的虚拟连接,多条信道复用一条 TCP 连接,不仅减少性能开销,同时也便于管理。

用 Python 的 pika 包实现 TCP 连接和信道创建:

class MqClient:
    def __init__(self, *, mq_host, mq_port, username, password):
        credentials = pika.PlainCredentials(username, password)
        conn_params = pika.ConnectionParameters(
            host=mq_host,
            port=mq_port,
            credentials=credentials
        )
        self.connection = pika.BlockingConnection(conn_params)
        self.channel = self.connection.channel()

Exchange

生产者通常将消息发送给交换机,而交换机再将消息路由到队列中,若路由不到,要么返回队列要么丢弃。

用 Python 实现交换机:

    def create_exchange(self, change_name):
        self.channel.exchange_declare(
            exchange=change_name,
            exchange_type='topic',
            passive=False,
            durable=True,
            auto_delete=False
        )

交换机通常分四种类型:

  1. fanout:将所有发送到该交换机的消息路由到所有与该交换机绑定的队列中;
  2. direct:将消息路由到 BindingKey 与 RoutingKey 完全一致的队列中;
  3. topic:将消息路由到 BindingKey 与 RoutingKey 匹配的队列中,匹配的规则包括 以 '.' 为分割、以 '*' 和 '#' 做模糊匹配;
  4. headers:该类型的交换机根据消息内容中国的 headers 属性进行匹配。

Queue

队列是 RabbitMQ 中用以存储消息的对象。多个消费者可以订阅同一个队列,而队列中的消息会被均摊到各个消费者,而不是每个消费者都收到所有的消息。

    def create_queue(self, exchange_name, queue_name, routing_key):
        self.channel.queue_declare(
            queue=queue_name,
        )
        self.queue_bind(exchange_name, queue_name, routing_key)

RoutingKey & BindingKey

RoutingKey 即路由键,通常是生产者发送消息时指定的。BindingKey 即绑定键,通常用于交换机与队列绑定。

二者通常配合起来使用,比如 direct 和 topic 类型的交换机在路由消息时,都是看这两个键是否匹配。某种情况下,RoutingKey 和 BindingKey 可以看做同一个东西。

    def queue_bind(self, exchange_name, queue_name, routing_key):
        self.channel.queue_bind(
            queue=queue_name,
            exchange=exchange_name,
            routing_key=routing_key,
        )

Publish/Subscribe 机制

RabbitMQ 消息的消费模式通常分为推模式和拉模式。

推模式采用的是订阅的方式,使用的是 basic_consume 方法 ;而拉模式采用的是从队列中获取消息的方式,使用的是 basic_get 方法。拉模式通常运用于获取单挑消息的场合,对于持续获取消息或者需要实现高吞吐量的场合,推模式更适合。

下面是一个推模式的例子:

def msg_consumer(channel, method_frame, header_frame, body):
    try:
        print("[Consumer] Receive message:")
        print("           {}: {}".format(method_frame.routing_key, body))
        time.sleep(1)
        channel.basic_ack(delivery_tag=method_frame.delivery_tag)
    except:
        print("[Consumer] Reject message and return it to queue!")
        channel.basic_nack(delivery_tag=method_frame.delivery_tag,
                           multiple=False, requeue=True)
    return


def msg_publisher(channel, *, exchange, routing_key):
    # Send a message
    data = "hahahahhahahahaha! I'm a bug and you can't catch me!"
    if channel.basic_publish(
            exchange=exchange,
            routing_key=routing_key,
            body=data,
            properties=pika.BasicProperties(
                content_type='text/plain',
                delivery_mode=1),
            mandatory=True):
        print('[Producer] Message was published')
    else:
        print('[Producer] Message was returned')
        
if __name__ == "__main__":
    client = MqClient(
        mq_host="172.16.110.17",
        mq_port=5672,
        username="guest",
        password="guest",
    )
    channel = client.get_channel()

    # 设置生产者
    msg_publisher(channel,
                  exchange=EXCHANGE_NAME,
                  routing_key="hdls.miao.message")
    # 设置消费者
    channel.basic_consume(
        msg_consumer,
        queue=QUEUE_NAME,
        no_ack=False,
        consumer_tag="hdls-consumer"
    )
    # 开始消费
    channel.start_consuming()
    try:
        channel.start_consuming()
    except KeyboardInterrupt:
        channel.stop_consuming()
    client.connection.close()

消费者收到消息后进行收到 ack,得到的结果如下:

[Producer] Message was published
[Consumer] Receive message:
           hdls.miao.message: b"hahahahhahahahaha! I'm a bug and you can't catch me!"

若对于该条消息,消费者不消费,而是拒绝:basic_nack,而拒绝的同时将参数设置为 requeue=True,即将消息打回队列,则得到的结果如下:

[Producer] Message was published
[Consumer] Receive message:
           hdls.miao.message: b"hahahahhahahahaha! I'm a bug and you can't catch me!"
[Consumer] Reject message and return it to queue!
[Consumer] Receive message:
           hdls.miao.message: b"hahahahhahahahaha! I'm a bug and you can't catch me!"
[Consumer] Reject message and return it to queue!
[Consumer] Receive message:
           hdls.miao.message: b"hahahahhahahahaha! I'm a bug and you can't catch me!"
[Consumer] Reject message and return it to queue!
[Consumer] Receive message:
           hdls.miao.message: b"hahahahhahahahaha! I'm a bug and you can't catch me!"
[Consumer] Reject message and return it to queue!
...

此时该条消息就会一直被打回队列,就一直堵在队列中:

死信

当一个消息被拒绝而被打回队列,而此后该消息没有消费者接收,成了死信,就会堵住队列,当队列中死信越来越多时,队列的性能会受到影响。对于死信的处理,设置死信队列是个很好的选择。

死信通常有下面几种情况:

  1. 消息被拒绝(通过 basic.reject 方法或 basic.nack 方法),同时被打回队列;
  2. 消息本身设置了 TTL 或队列设置了 TTL,且达到了过期时间;
  3. 队列可持有消息数量达到了上限。

死信交换机

当消息在一个队列中成为死信时,就能够被发送到另一个交换机中,也就是死信交换机。死信交换机其实就是普通的交换机,不过绑定的是死信队列,其声明和使用与普通交换机一致。

死信队列

死信队列就是用来接收死信的队列,但其本质与普通队列一样。只不过在设置普通队列的时候需要给其定义死信交换机是哪个,当消息成为死信时,以什么样的 routing_key 来路由到死信队列里去。这样所有的死信就可以被路由到对应的死信队列中去了。

需要注意的是,在声明普通队列的死信设置之前,死信交换机和死信队列需要先存在。

根据定义将上面的普通队列做修改:

    def create_queue(self, exchange_name, queue_name, routing_key,
                     is_dead=False):
        arguments = {}
        if not is_dead:
            arguments = {
                "x-dead-letter-exchange": DEAD_EXCHANGE_NAME,
                "x-dead-letter-routing-key": DEAD_ROUTING_KEY,
            }
        self.channel.queue_declare(
            queue=queue_name,
            arguments=arguments,
        )
        self.queue_bind(exchange_name, queue_name, routing_key)

在声明队列时,需要声明两个参数即可:x-dead-letter-exchangex-dead-letter-routing-key。同时在声明普通队列之前声明死信队列:

    def connect(self):
        self.create_exchange(DEAD_EXCHANGE_NAME)
        self.create_queue(DEAD_EXCHANGE_NAME,
                          DEAD_QUEUE_NAME, DEAD_ROUTING_KEY, is_dead=True)

        self.create_exchange(EXCHANGE_NAME)
        self.create_queue(EXCHANGE_NAME, QUEUE_NAME, ROUTING_KEY)

如果同时加上死信队列的消费者,就可以统一处理死信了:

def dead_msg_consumer(channel, method_frame, header_frame, body):
    try:
        print("[DEAD CSM] Dead Message! It's time to put it to dead queue.")
        print("           {}: {}".format(method_frame.routing_key, body))
        print("           ACK dead message!")
        channel.basic_ack(delivery_tag=method_frame.delivery_tag)
    except:
        channel.basic_nack(delivery_tag=method_frame.delivery_tag,
                           multiple=False, requeue=False)
    return

万事俱备!但运行后居然报错:

pika.exceptions.ChannelClosed: (406, "PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'hdls.mq.queue' in vhost '/': received the value 'dead.exchange' of type 'longstr' but current is none")

这是因为之前我们已经声明过不加死信设置的队列了,声明 queue 时试图设定一个 x-dead-letter-exchange 参数,当前服务器上该 queue 的该参数为 none,服务器不允许所以报错。

此时有两种解决方法:一是在服务器上将之前的 queue 删除,加上死信参数,再次声明队列;二是通过 policy 来设置这个参数。

policy 可以用 rabbitmqctl set_policy设置,也可以在 RabbitMQ 的前端页面进行:

需要注意的是,通过 policy 方法设置时参数为 dead-letter-exchange dead-letter-routing-key,而用第一种方法中的死信参数需要加上 x- 前缀。

将死信设置加上后,再次启用 consumer:

[Publisher] Message was published
[Consumer] Receive message:
           hdls.miao.message: b"hahahahhahahahaha! I'm a bug and you can't catch me!"
[Consumer] Reject message and return it to queue!
[DEAD CSM] Dead Message! It's time to put it to dead queue.
           dead.message: b"hahahahhahahahaha! I'm a bug and you can't catch me!"
           ACK dead message!

可以看到,消息在打回队列后就被路由到了死信队列。

延迟队列

所谓延迟队列,指的是消息发送后,并不想立即被消费者拿到,希望在指定时间后,消费者才拿到消息。

延迟队列可以用死信队列来实现。利用队列或消息的 TTL 特性,可以做到消息在指定时间内超时后被路由到死信队列,而此时死信队列就可以当做延迟队列来做消息处理。

    def create_queue(self, exchange_name, queue_name, routing_key,
                     is_dead=False):
        arguments = {}
        if not is_dead:
            arguments = {
                "x-message-ttl": 3000,
                "x-dead-letter-exchange": DEAD_EXCHANGE_NAME,
                "x-dead-letter-routing-key": DEAD_ROUTING_KEY,
            }
        self.channel.queue_declare(
            queue=queue_name,
            arguments=arguments,
        )
        self.queue_bind(exchange_name, queue_name, routing_key)

在普通队列的死信设置里加上一条 x-message-ttl 就可以设置消息的 TTL。

[Publisher] Message was published
[Consumer] Receive message:
           hdls.miao.message: b"I'm not bug, but you can only receive me in 3 seconds."
[Consumer] Reject message and return it to queue!
[Consumer] Receive message:
           hdls.miao.message: b"I'm not bug, but you can only receive me in 3 seconds."
[Consumer] Reject message and return it to queue!
[Consumer] Receive message:
           hdls.miao.message: b"I'm not bug, but you can only receive me in 3 seconds."
[Consumer] Reject message and return it to queue!
[DELAY CSM] Delay queue receive the message!
            dead.message: b"I'm not bug, but you can only receive me in 3 seconds."
            ACK delay message!
2019/3/28 posted in  中间件

Kubernetes Job 与 CronJob

如果说 Deployment、DaemonSet 等资源为 Kubernetes 承担了长时间、在线计算的能力,那么定时、短期、甚至一次性的离线计算能力,便是 Job 和 CronJob 所承担的事情。

Job

Job 其实就是根据定义起一个或多个 pod 来执行任务,pod 执行完退出后,这个 Job 就完成了。所以 Job 又称为 Batch Job ,即计算业务或离线业务。

Job 使用方法

Job 的 YAML 定义与 Deployment 十分相似。与 Deployment 不同的是,Job 不需要定义 spec.selector 来指定需要控制的 pod,看个例子:

apiVersion: batch/v1
kind: Job
metadata:
  name: date
spec:
  template:
    spec:
      containers:
      - name: date
        image: ubuntu:16.04
        command: ["sh", "-c", "date > /date/date.txt"]
        volumeMounts:
        - mountPath: /date
          name: date-volume
      restartPolicy: Never
      volumes:
      - name: date-volume
        hostPath:
          path: /date

在这个 Job 中,我们定义了一个 Ubuntu 镜像的容器,用于将当前时间输出至宿主机的 /date/date.txt 文件中。将此 Job 创建好后,我们可以查看该 Job 对象:

可以看到,Job 在创建后被加上了 controller-uid=***** 的 Label,和与之对应的 Label Selector,从而保证了 Job 与它所管理的 Pod 之间的匹配关系。查看 pod 可以看到相同的 Label:

pod 在执行完毕后,状态会变成 Completed,我们可以去 pod 被调度的 node 上查看我们挂载进去的 date.txt 文件:

[root@rancher-node3 ~]# cat /date/date.txt
Sat Dec 22 16:09:48 UTC 2018

pod 重启策略

在 Job 中,pod 的重启策略 restartPolicy 不允许被设置成 Always,只允许被设置为 Never 或 OnFailure。这是因为 Job 的 pod 执行完毕后直接退出,如果 restartPolicy=Always,pod 将不断执行计算作业,这可不是我们期望的。

Job 可以设置 pod 的最长运行时间 spec.activeDeadlineSeconds,一旦超过了这个时间,这个 Job 的所有 pod 都会被终止。

那么,如果 pod 的计算作业失败了,在不同的重启策略下会怎么办?

restartPolicy=Never

如果设置了 restartPolicy=Never,那么 Job Controller 会不断的尝试创建一个新的 pod 出来,默认尝试 6 次。当然这个值可以设置,即 Job 对象的 spec.backoffLimit 字段。

需要注意的是,重新创建 Pod 的间隔是呈指数增加的。

restartPolicy=OnFailure

如果设置了 restartPolicy=Never,那么 Job Controller 会不断的重启这个 pod。

Job 工作原理

通过观察 Job 的创建过程,不难看出 Job 维护了两个值 DESIRED 和 SUCCESSFUL,分别表示 spec.completions 和 成功退出的 pod 数。

而在 Job 对象中有两个参数意义重大,它们控制着 Job 的并行任务:
spec.parallelism :定义一个 Job 在任意时间最多可以启动同时运行的 Pod 数;
spec.completions :定义 Job 至少要完成的 Pod 数目,即 Job 的最小完成数。

弄清楚了这两个参数,我们再来看 Job 的工作原理。

首先,Job Controller 控制的直接就是 pod;
在整个 Job 的作业过程中,Job Controller 根据实际在 Running 的 pod 数、已成功退出的 pod 数、parallelism 值、completions 值,计算出当前需要创建或删除的 pod 数,去调用 APIServer 来执行具体操作。

就拿上面的例子说明,比如将 YAML 改成:

apiVersion: batch/v1
kind: Job
metadata:
  name: date
spec:
  parallelism: 2
  completions: 3
  template:
    spec:
      containers:
      - name: date
        image: ubuntu:16.04
        command: ["sh", "-c", "date >> /date/date.txt"]
        volumeMounts:
        - mountPath: /date
          name: date-volume
      restartPolicy: Never
      volumes:
      - name: date-volume
        hostPath:
          path: /date

第一步:判断当前没有 pod 在 Running,且成功退出 pod 数为 0,当前最多允许 2 个 pod 并行。向 APIServer 发起创建 2 个 pod 的请求。此时 2 个 pod Running,当这 2 个 pod 完成任务并成功退出后,进入第二步;

第二步:当前 Running pod 数为 0,成功退出数为 2,当前最多允许 2 个 pod 并行,Job 最小完成数为 3。则向 APIServer 发起创建 1 个 pod 的请求。此时 1 个 pod Running,当这个 pod 完成任务并成功退出后,进入第三步;

第三步:当前成功退出 pod 数为 3,Job 最小完成数为 3。判断 Job 完成作业。

批处理调度

根据 Job 的这些特性,我们就可以用以实现批处理调度,也就是并行启动多个计算进程去处理一批工作项。根据并行处理的特性,往往将 Job 分为三种类型,即 Job 模板拓展、固定 completions 数的 Job、固定 parallelism 数的 Job。

Job 模板拓展

这种模式最简单粗暴,即将 Job 的 YAML 定义成外界可使用的模板,再由外部控制器使用这些模板来生成单一无并行任务的 Job。比如,我们将上面的例子改写成模板:

apiVersion: batch/v1
kind: Job
metadata:
  name: date-$ITEM
spec:
  template:
    spec:
      containers:
      - name: date
        image: ubuntu:16.04
        command: ["sh", "-c", "echo item number $ITEM; date >> /date/date.txt; sleep 5s"]
        volumeMounts:
        - mountPath: /date
          name: date-volume
      restartPolicy: Never
      volumes:
      - name: date-volume
        hostPath:
          path: /date

而在使用的时候,只需将 $ITEM 替换掉即可:

cat job.yml | sed "s/\$ITEM/1/" > ./job-test.yaml

除了上面这张简单的基础模板使用,Kubernetes 官网还提供了一种以 jinja2 模板语言实现的多模板参数的模式:

{%- set params = [{ "name": "apple", "url": "http://www.orangepippin.com/apples", },
                  { "name": "banana", "url": "https://en.wikipedia.org/wiki/Banana", },
                  { "name": "raspberry", "url": "https://www.raspberrypi.org/" }]
%}
{%- for p in params %}
{%- set name = p["name"] %}
{%- set url = p["url"] %}
apiVersion: batch/v1
kind: Job
metadata:
  name: jobexample-{{ name }}
  labels:
    jobgroup: jobexample
spec:
  template:
    metadata:
      name: jobexample
      labels:
        jobgroup: jobexample
    spec:
      containers:
      - name: c
        image: busybox
        command: ["sh", "-c", "echo Processing URL {{ url }} && sleep 5"]
      restartPolicy: Never
---
{%- endfor %}

在使用这种模式需要确保已经安装了 jinja2 的包:pip install --user jinja2

再执行一条 Python 命令即可替换:

alias render_template='python -c "from jinja2 import Template; import sys; print(Template(sys.stdin.read()).render());"'
cat job.yaml.jinja2 | render_template > jobs.yaml

或者直接进行 kubectl create:

alias render_template='python -c "from jinja2 import Template; import sys; print(Template(sys.stdin.read()).render());"'
cat job.yaml.jinja2 | render_template | kubectl create -f -

固定 completions 数的 Job

这种模式就真正实现了并行工作模式,且 Job 的完成数是固定的。

在这种模式下,需要一个存放 work item 的队列,比如 RabbitMQ,我们需要先将要处理的任务变成 work item 放入任务队列。每个 pod 创建时,去队列里获取一个 task,完成后将其从队列里删除,直到完成了定义的 completions 数。

上图描述了一个 completions=6,parallelism=2 的 Job 的示意图。选择 RabbitMQ 来充当这里的工作队列;外部生产者产生 6 个 task ,放入工作队列中;在 pod 模板中定义 BROKER_URL,来作为消费者。一旦创建了这个 Job,就会以并发度为 2 的方式,去消费这些 task,直到任务全部完成。其 yaml 文件如下:

apiVersion: batch/v1
kind: Job
metadata:
  name: job-wq-1
spec:
  completions: 6
  parallelism: 2
  template:
    metadata:
      name: job-wq-1
    spec:
      containers:
      - name: c
        image: myrepo/job-wq-1
        env:
        - name: BROKER_URL
          value: amqp://guest:guest@rabbitmq-service:5672
        - name: QUEUE
          value: job1
  restartPolicy: OnFailure

固定 parallelism 数的 Job

最后一种模式是指定并行度(parallelism),但不设置固定的 completions 的值。

每个 pod 去队列里拿任务执行,完成后继续去队列里拿任务,直到队列里没有任务,pod 才退出。这种情况下,只要有一个 pod 成功退出,就意味着整个 Job 结束。这种模式对应的是任务总数不固定的场景。

上图描述的是一个并行度为 2 的 Job。RabbitMQ 不能让客户端知道是否没有数据,因此这里采用 Redis 队列;每个 pod 去队列里消费一个又一个任务,直到队列为空后退出。其对应的 yaml 文件如下:

apiVersion: batch/v1
kind: Job
metadata:
  name: job-wq-2
spec:
  parallelism: 2
  template:
    metadata:
      name: job-wq-2
    spec:
      containers:
      - name: c
        image: myrepo/job-wq-2
  restartPolicy: OnFailure

CronJob

Kubernetes 在 v1.5 开始引入了 CronJob 对象,顾名思义,就是定时任务,类似 Linux Cron。先看个例子:

apiVersion: batch/v1beta1
kind: CronJob
metadata:
  name: cron-date
spec:
  schedule: "*/1 * * * *"
  jobTemplate:
    spec:
      template:
        spec:
          containers:
          - name: date
            image: ubuntu:16.04
            command: ["sh", "-c", "date >> /date/date.txt"]
            volumeMounts:
            - mountPath: /date
              name: date-volume
          nodeSelector:
            kubernetes.io/hostname: rancher-node3
          volumes:
          - name: date-volume
            hostPath:
              path: /date
          restartPolicy: OnFailure

CronJob 其实就是一个 Job 对象的控制器,需要定义一个 Job 的模板,即 jobTemplate 字段;另外,其定时表达式 schedule 基本上照搬了 Linux Cron 的表达式:

# ┌───────────── minute (0 - 59)
# │ ┌───────────── hour (0 - 23)
# │ │ ┌───────────── day of the month (1 - 31)
# │ │ │ ┌───────────── month (1 - 12)
# │ │ │ │ ┌───────────── day of the week (0 - 6) (Sunday to Saturday;
# │ │ │ │ │                                   7 is also Sunday on some systems)
# │ │ │ │ │
# │ │ │ │ │
# * * * * * 

创建出该 CronJob 对象后,CronJob 会记录下最近一次 Job 的执行时间:

[root@rancher-node1 jobs]# kubectl get cronjob cron-date
NAME        SCHEDULE      SUSPEND   ACTIVE   LAST SCHEDULE   AGE
cron-date   */1 * * * *   False     0        22s             15m
[root@rancher-node1 jobs]# kubectl get job
NAME                   DESIRED   SUCCESSFUL   AGE
cron-date-1545584220   1         1            2m
cron-date-1545584280   1         1            1m
cron-date-1545584340   1         1            23s
[root@rancher-node1 jobs]# kubectl get po
NAME                         READY   STATUS      RESTARTS   AGE
cron-date-1545584220-gzmzw   0/1     Completed   0          2m
cron-date-1545584280-bq9nx   0/1     Completed   0          1m
cron-date-1545584340-84tf2   0/1     Completed   0          27s

如果某些定时任务比较特殊,某个 Job 还没有执行完,下一个新的 Job 就产生了。这种情况可以通过设置 spec.concurrencyPolicy 字段来定义具体策略:

  1. concurrencyPolicy=Allow,这也是默认情况,这意味着这些 Job 可以同时存在;
  2. concurrencyPolicy=Forbid,这意味着不会创建新的 Pod,该创建周期被跳过;
  3. concurrencyPolicy=Replace,这意味着新产生的 Job 会替换旧的、没有执行完的 Job。

Kubernetes 所能容忍的 Job 创建失败数为 100,但是其失败时间窗口可以自定义。即通过字段 spec.startingDeadlineSeconds 可以用来设定这个时间窗口,单位为秒,也就是说在这个时间窗口内最大容忍数为 100,如果超过了 100,这个 Job 就不会再被执行。

2018/12/22 posted in  Kubernetes

Kubernetes 安全机制解读

在 Kubernetes 中,所有资源的访问和变更都是围绕 APIServer 展开的。比如说 kubectl 命令、客户端 HTTP RESTFUL 请求,都是去 call APIServer 的 API 进行的,本文就重点解读 k8s 为了集群安全,都做了些什么。

首先,Kubernetes 官方文档给出了上面这张图。描述了用户在访问或变更资源的之前,需要经过 APIServer 的认证机制、授权机制以及准入控制机制。这三个机制可以这样理解,先检查是否合法用户,再检查该请求的行为是否有权限,最后做进一步的验证或添加默认参数。

用户

Kubernetes 中有两种用户,一种是内置“用户” ServiceAccount,另一种我称之为自然人。

所谓自然人就是指区别于 pod 等资源概念的“人”,可以理解成实际操作 "kubectl" 命令的人。admin 可以分发私钥,但自然人可以储存类似 KeyStone 甚至包含账号密码的文件,所以 k8s 中没有对自然人以 API 对象描述之。

在典型的 Kubernetes 集群中,API 通常服务在 443 端口,APIServer 提供自签名证书。当你使用 kube-up.sh 创建集群用户时,证书会自动在 $USER/.kube/config 中创建出来,而后续用 kubectl 命令访问 APIServer 时,都是用这个证书。

与之相反,k8s 中以 API 对象的形式描述和管理 ServiceAccount。它们被绑定在某个具体的 namespace 中,可以由 APIServer 自动创建出来或手动 call k8s API。

认证机制(Authentication)

k8s 中的认证机制,是在用户访问 APIServer 的第一步。通常是一个完整的 HTTP 请求打过来,但是这一步往往只检测请求头或客户端证书。

认证机制目前有客户端证书、bearer tokens、authenticating proxy、HTTP basic auth 这几种模式。使用方式通常有以下几种:

  1. X509 Client Certs: 客户端证书模式需要在 kubectl 命令中加入 --client-ca-file=<SOMEFILE> 参数,指明证书所在位置。

  2. Static Token File: --token-auth-file=<SOMEFILE> 参数指明 bearer tokens 所在位置。

  3. bearer tokens: 在 HTTP 请求头中加入 Authorization: Bearer <TOKEN>

  4. Bootstrap Tokens: 与 bearer tokens 一致,但 TOKEN 格式为 [a-z0-9]{6}.[a-z0-9]{16}。该方式称为 dynamically-managed Bearer token,以 secret 的方式保存在 kube-system namespace 中,可以被动态的创建和管理。同时,启用这种方式还需要在 APIServer 中打开 --enable-bootstrap-token-auth ,这种方式还处于 alpha 阶段。

  5. Static Password File: 以参数 --basic-auth-file=<SOMEFILE> 指明 basic auth file 的位置。这个 basic auth file 以 csv 文件的形式存在,里面至少包含三个信息:password、username、user id,同时该模式在使用时需要在请求头中加入 Authorization: Basic BASE64ENCODED(USER:PASSWORD)

  6. Service Account Tokens: 该方式通常被 pod 所使用,在 PodSpec 中指明 ServiceAccount 来访问 ApiServer。

除了以上列出来的几种方式外,还有一些比较特殊的访问方式,这里不再详细解读。

授权机制(Authorization)

当用户通过认证后,k8s 的授权机制将对用户的行为等进行授权检查。换句话说,就是对这个请求本身,是否对某资源、某 namespace、某操作有权限限制。

授权机制目前有 4 种模式:RBAC、ABAC、Node、Webhook。下面对这 4 种模式分别做分析。

RBAC

Role-based access control (RBAC) 是基于角色的权限访问控制,通常是对于“内置用户”而言的。该模式是在 k8s v1.6 开发出来的。若要开启该模式,需要在 APIServer 启动时,设置参数 --authorization-mode=RBAC

RBAC 所使用的 API Group 是 rbac.authorization.k8s.io/v1beta1,直到 Kubernetes v1.8 后,RBAC 模块达到稳定水平,所使用的 API Group 为 rbac.authorization.k8s.io/v1

所谓基于角色的权限访问控制,就是对某个用户赋予某个角色,而这个角色通常决定了对哪些资源拥有怎样的权限。

ServiceAccount

首先来看看这个 “内置用户”,在大多时候我们都不使用 “自然人” 这个功能,而是使用 ServiceAccount,再对其他资源授予某个 ServiceAccount,就使得其能够以 “内置用户” 的身份去访问 APIServer。

创建一个 ServiceAccount 很简单,只需要指定其所在 namespace 和 name 即可。举个例子:

apiVersion: v1
kind: ServiceAccount
metadata:
  namespace: hdls
  name: hdls-sa

Role & Rolebinding

RBAC 中最重要的概念就是 RoleRoleBindingRole 定义了一组对 Kubernetes API 对象的操作权限,而 RoleBinding 则定义的是具体的 ServiceAccount 和 Role 的对应关系。

举个 Role 的例子如下:

kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
    namespace: hdls
    name: hdls-role
rules:
- apiGroups: [""]
  resources: ["pods"]
  verbs: ["get", "list"]

其中:
namespace: 在这里仅限于逻辑上的“隔离”,并不会提供任何实际的隔离或者多租户能力;
rules:定义的是权限规则,允许“被作用者”,对 hdls 下面的 Pod 对象,进行 GET 和 LIST 操作;
apiGroups:为 "" 代表 core API Group;
resources:指的是资源类型,对此还可以进行详细的划分,指定可以操作的资源的名字,比如:

rules:
- apiGroups: [""]
  resources: ["configmaps"]
  resourceNames: ["my-config"]
  verbs: ["get"]

verbs: 指的是具体的操作,当前 Kubernetes(v1.11)里能够对 API 对象进行的所有操作有 "get", "list", "watch", "create", "update", "patch", "delete"。

再看 RoleBinding 的例子:

kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
    name: hdls-rolebinding
    namespace: hdls
subjects:
- kind: ServiceAccount
    name: hdls-sa
    apiGroup: rbac.authorization.k8s.io
roleRef:
    kind: Role
    name: hdls-role
    apiGroup: rbac.authorization.k8s.io

可以看到,这个 RoleBinding 对象里定义了一个 subjects 字段,即“被作用者”。它的类型是 ServiceAccount,就是上面创建的 sa。这个 subjects 还可以是 User 和 Group,User 是指 k8s 里的用户,而 Group 是指 ServiceAccounts。

roleRef 字段是用来直接通过名字,引用我们前面定义的 Role 对象(hdls-role),从而定义了 Subject 和 Role 之间的绑定关系。

此时,我们再用 kubectl get sa -n hdls -o yaml 命令查看之前的 ServiceAccount,就可以看到 ServiceAccount.secret,这是因为 k8s 会为一个 ServiceAccount 自动创建并分配一个 Secret 对象,而这个 Secret 就是用来跟 APIServer 进行交互的授权文件: TokenToken 文件的内容一般是证书或者密码,以一个 Secret 对象的方式保存在 etcd 当中。

这个时候,我们在我们的 Pod 的 YAML 文件中定义字段 .spec.serviceAccountName 为上面的 ServiceAccount name 即可声明使用。

如果一个 Pod 没有声明 serviceAccountName,Kubernetes 会自动在它的 Namespace 下创建一个名叫 default 的默认 ServiceAccount,然后分配给这个 Pod。然而这个默认 ServiceAccount 并没有关联任何 Role。也就是说,此时它有访问 APIServer 的绝大多数权限。

ClusterRole & ClusterRoleBinding

需要注意的是 Role 和 RoleBinding 对象都是 Namespaced 对象,它们只对自己的 Namespace 内的资源有效。

而某个 Role 需要对于非 Namespaced 对象(比如:Node),或者想要作用于所有的 Namespace 的时候,我们需要使用 ClusterRole 和 ClusterRoleBinding 去做授权。

这两个 API 对象的用法跟 Role 和 RoleBinding 完全一样。只不过,它们的定义里,没有了 Namespace 字段。

值得一提的是,Kubernetes 已经内置了很多个为系统保留的 ClusterRole,它们的名字都以 system: 开头。一般来说,这些系统级别的 ClusterRole,是绑定给 Kubernetes 系统组件对应的 ServiceAccount 使用的。

除此之外,Kubernetes 还提供了四个内置的 ClusterRole 来供用户直接使用:

cluster-admin:整个集群的最高权限。如果在 ClusterRoleBinding 中使用,意味着在这个集群中的所有 namespace 中的所有资源都拥有最高权限,为所欲为;如果在 RoleBinding 中使用,即在某个 namespace 中为所欲为。

admin:管理员权限。如果在 RoleBinding 中使用,意味着在某个 namespace 中,对大部分资源拥有读写权限,包括创建 Role 和 RoleBinding 的权限,但没有对资源 quota 和 namespace 本身的写权限。

edit:写权限。在某个 namespace 中,拥有对大部分资源的读写权限,但没有对 Role 和 RoleBinding 的读写权限。

view:读权限。在某个 namespace 中,仅拥有对大部分资源的读权限,没有对 Role 和 RoleBinding 的读权限,也没有对 seccrets 的读权限。

Aggregated ClusterRoles

在 Kubernetes v1.9 之后,ClusterRole 有一种新的定义方法,就是使用 aggregationRule 将多个 ClusterRole 合成一个新的 ClusterRole

首先看个 k8s 官网的例子:

kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: monitoring
aggregationRule:
  clusterRoleSelectors:
  - matchLabels:
      rbac.example.com/aggregate-to-monitoring: "true"
rules: []

其中 rules 字段不必定义,会被 controller manager 自动填充。

可以看出 aggregationRule 就是将所有满足 label 条件的 ClusterRole 的合成一个 ClusterRole,而这个新的 ClusterRole 权限为其他总和。

Group

相对于 User 而言,k8s 还拥有“用户组”(Group)的概念,也就是一组“用户”的意思。而对于“内置用户” ServiceAccount 来说,“用户组”的概念也同样适用。

实际上,一个 ServiceAccount,在 Kubernetes 里对应的“用户”的名字是: system:serviceaccount:<ServiceAccount 名字 > ;而它对应的内置“用户组”的名字,就是 system:serviceaccounts:<Namespace 名字 >

对于 Group 的运用,我们举个例子,在 RoleBinding 里这样定义 subjects:

subjects:
- kind: Group
    name: system:serviceaccounts:hdls
    apiGroup: rbac.authorization.k8s.io

这就意味着这个 Role 的权限规则,作用于 hdls 里的所有 ServiceAccount。

而如果 Group 不指定 Namespace,即直接定义为 system:serviceaccounts,意味着作用于整个系统里的所有 ServiceAccount。

ABAC

Attribute-based access control (ABAC) 是基于属性的权限访问控制。若要开启该模式,需要在 APIServer 启动时,开启 --authorization-policy-file=<SOME_FILENAME>--authorization-mode=ABAC 两个参数。

其 policy 文件用来指定权限规则,必须满足每行都是一个 json 对象的格式。可以指定 user 或 group 为某个特定的对象,并描述其拥有的权限。

与 Yaml 文件一致,必须描述的属性有 apiVersion、kind、spec,而 spec 里描述了具体的用户、资源和行为。看个例子:

{"apiVersion": "abac.authorization.kubernetes.io/v1beta1", "kind": "Policy", "spec": {"user": "bob", "namespace": "projectCaribou", "resource": "pods", "readonly": true}}

这就描述了用户 bob 只有在 namespace projectCaribou 下对 pod 的读权限。类似的,这个 User 可以是某个人,也可以是 kubelet 或者某个 ServiceAccount,这里 ServiceAccount 需要写全,比如:system:serviceaccount:kube-system:default

如果是描述某个 namespace 下的所有人,需要用到 group,比如:

{"apiVersion": "abac.authorization.kubernetes.io/v1beta1", "kind": "Policy", "spec": {"group": "system:serviceaccounts:default", "readonly": true, "resource": "pods"}}

Node

Node 授权机制是一种特殊的模式,是 kubelet 发起的请求授权。开启该模式,需要开启参数 --authorization-mode=Node

通过启动 --enable-admission-plugins=...,NodeRestriction,...,来限制 kubelet 访问 node,endpoint、pod、service以及secret、configmap、PV 和 PVC 等相关的资源。

Webhook

Webhook 模式是一种 HTTP 回调模式,是一种通过 HTTP POST 方式实现的简单事件通知。该模式需要 APIServer 配置参数 –authorization-webhook-config-file=<SOME_FILENAME>,HTTP 配置文件的格式跟 kubeconfig 的格式类似。

# Kubernetes API version
apiVersion: v1
# kind of the API object
kind: Config
# clusters refers to the remote service.
clusters:
  - name: name-of-remote-authz-service
    cluster:
      # CA for verifying the remote service.
      certificate-authority: /path/to/ca.pem
      # URL of remote service to query. Must use 'https'. May not include parameters.
      server: https://authz.example.com/authorize

# users refers to the API Server's webhook configuration.
users:
  - name: name-of-api-server
    user:
      client-certificate: /path/to/cert.pem # cert for the webhook plugin to use
      client-key: /path/to/key.pem          # key matching the cert

# kubeconfig files require a context. Provide one for the API Server.
current-context: webhook
contexts:
- context:
    cluster: name-of-remote-authz-service
    user: name-of-api-server
  name: webhook

其中,Cluster 指需要回调的地方的客户端,指定其访问证书和 URL;user 指回调处访问的身份,指明其所需证书和 key;contexts 指回调的内容。

准入控制(Admission Controllers)

在一个请求通过了认证机制和授权认证后,需要经过最后一层筛查,即准入控制。这个准入控制模块的代码通常在 APIServer 中,并被编译到二进制文件中被执行。这一层安全检查的意义在于,检查该请求是否达到系统的门槛,即是否满足系统的默认设置,并添加默认参数。

准入控制以插件的形式存在,开启的方式为:
kube-apiserver --enable-admission-plugins=NamespaceLifecycle,LimitRanger ...

关闭的方式为:
kube-apiserver --disable-admission-plugins=PodNodeSelector,AlwaysDeny ...

常用的准入控制插件有:

  • AlwaysAdmit:允许所有请求通过,被官方反对,因为没有实际意义;
  • AlwaysPullImages:将每个 pod 的 image pull policy 改为 always,在多租户的集群被使用;
  • AlwaysDeny:禁止所有请求通过,被官方反对,因为没有实际意义;
  • DefaultStorageClass:为每个 PersistentVolumeClaim 创建默认的 PV;
  • DefaultTolerationSeconds:如果 pod 对污点 node.kubernetes.io/not-ready:NoExecutenode.alpha.kubernetes.io/unreachable:NoExecute 没有容忍,为其创建默认的 5 分钟容忍 notready:NoExecuteunreachable:NoExecute
  • LimitRanger:确保每个请求都没有超过其 namespace 下的 LimitRange,如果在 Deployment 中使用了 LimitRange 对象,该准入控制插件必须开启;
  • NamespaceAutoProvision:检查请求中对应的 namespace 是否存在,若不存在自动创建;
  • NamespaceExists:检查请求中对应的 namespace 是否存在,若不存在拒绝该请求;
  • NamespaceLifecycle:保证被删除的 namespace 中不会创建新的资源;
  • NodeRestriction:不允许 kubelet 修改 Node 和 Pod 对象;
  • PodNodeSelector:通过读取 namespace 的注解和全局配置,来控制某 namespace 下哪些 label 选择器可被使用;
  • PodPreset:满足预先设置的标准的 pod 不允许被创建;
  • Priority:通过 priorityClassName 来决定优先级;
  • ResourceQuota:保证 namespace 下的资源配额;
  • ServiceAccount:保证 ServiceAccount 的自动创建,如果用到 ServiceAccount,建议开启;

以上只列举了部分,详情请移步 Kubernetes 官方文档。

官方建议:

  • 版本 > v1.10:
--enable-admission-plugins=NamespaceLifecycle,LimitRanger,ServiceAccount,DefaultStorageClass,DefaultTolerationSeconds,MutatingAdmissionWebhook,ValidatingAdmissionWebhook,ResourceQuota
  • v1.9
--admission-control=NamespaceLifecycle,LimitRanger,ServiceAccount,DefaultStorageClass,DefaultTolerationSeconds,MutatingAdmissionWebhook,ValidatingAdmissionWebhook,ResourceQuota
  • v1.6 - v1.8
--admission-control=NamespaceLifecycle,LimitRanger,ServiceAccount,PersistentVolumeLabel,DefaultStorageClass,ResourceQuota,DefaultTolerationSeconds
  • v1.4 - v1.5
--admission-control=NamespaceLifecycle,LimitRanger,ServiceAccount,DefaultStorageClass,ResourceQuota
2018/11/26 posted in  Kubernetes