Alembic 速查笔记

Alembic 命令行

初始化

$ cd yourproject
$ alembic init alembic

创建一次 alembic

alembic revision -m "***"

upgrade / downgrade

alembic upgrade head  # 升级到最新版本
alembic upgrade +2
alembic downgrade -1
alembic downgrade base  # 回退到最开始的版本

获取 alembic 版本信息

alembic history
alembic current
alembic heads
alembic branches

自动生成

alembic revision --autogenerate -m "Added account table"

Alembic 语法

增加表

from alembic import op
import sqlalchemy as sa
def upgrade():
    ### commands auto generated by Alembic - please adjust! ###
    op.create_table(
      'account',
      sa.Column('id', sa.Integer()),
      sa.Column('name', sa.String(length=50), nullable=False),
      sa.Column('description', sa.VARCHAR(200)),
      sa.Column('last_transaction_date', sa.DateTime()),
      sa.PrimaryKeyConstraint('id')
    )

公式:

op.create_table(<表名>, sa.Column(<列名>, *属性))

删除表

def downgrade():
  ### commands auto generated by Alembic - please adjust! ###
  op.drop_table("account")
  ### end Alembic commands ###

公式:

op.drop_table(<表名>)

增加一列

from alembic import op
from sqlalchemy import Column,String
    
op.add_column('organization',
    Column('name',String())
)

公式:

op.add_column(<表名>, Column(<列名>, *属性))

删除一列

op.drop_column('organization','name' )

公式:

op.drop_column(<表名>, <列名>)

修改列属性

op.alter_column('user', 'name', new_column_name='username',
                    existing_type=mysql.VARCHAR(length=20))

公式:

alter_column(<表名>, <旧列名>, new_column_name=<新列名>, existing_type=<旧字段类型>, type_=<字段类型修改后>)

注:当表中有数据时,修改字段类型无效或报错

分批处理

with op.batch_alter_table("some_table") as batch_op:
    batch_op.add_column(Column('foo', Integer))
    batch_op.drop_column('bar')

执行 SQL 语句

sql="""ALTER TABLE actions alter column finished_at type float;"""
conn=op.get_bind()
conn.execute(sql)

插入数据

# 在已有表内插入数据
from alembic import op
from sqlalchemy.sql import table,column
from sqlalchemy import String, Integer, Date
    
#Create an ad-hoc table to use for the insert statement.
accounts_table=table('test',
    column('id',Integer),
    column('name',String),
)
    
op.bulk_insert(accounts_table,
    [
        {'id':1,'name':'JohnSmith'},
        {'id':2,'name':'EdWilliams'},
        {'id':3,'name':'WendyJones'},
    ]
)

更新版本,但不操作实际的 upgrade 内容(慎用)

alembic stamp head 
2019/4/25 posted in  Python

RabbitMQ 模型和死信队列

RabbitMQ 模型

RabbitMQ 是一个生产者/消费者模型,生产者生产消息到队列中,而消费者从队列中拿消息进行消费,两者并不直接交互。

我们首先来看看 RabbitMQ 的模型结构

在图中,我们可以看到,整个结构包括:生产者 Producer、交换机 Exchange、队列 Queue,以及消费者 Consumer。

其中,生产者和消费者与 MQ 连接时会创建 TCP 连接和信道,生产者生产消息,根据其指定的 RoutingKey 已经交换机连接 Queue 的 BindingKey,两者共同决定将消息发送到哪个队列中。

下面我们逐一分析各个部分的功能。

Channel


每个生产者或消费者都需要与 RabbitMQ Broker 建立 TCP 连接,即 Connection。Connection 建立起来之后,客户端会创建一个 AMQP 信道,即 Channel,这是基于 Connection 的虚拟连接,多条信道复用一条 TCP 连接,不仅减少性能开销,同时也便于管理。

用 Python 的 pika 包实现 TCP 连接和信道创建:

class MqClient:
    def __init__(self, *, mq_host, mq_port, username, password):
        credentials = pika.PlainCredentials(username, password)
        conn_params = pika.ConnectionParameters(
            host=mq_host,
            port=mq_port,
            credentials=credentials
        )
        self.connection = pika.BlockingConnection(conn_params)
        self.channel = self.connection.channel()

Exchange

生产者通常将消息发送给交换机,而交换机再将消息路由到队列中,若路由不到,要么返回队列要么丢弃。

用 Python 实现交换机:

    def create_exchange(self, change_name):
        self.channel.exchange_declare(
            exchange=change_name,
            exchange_type='topic',
            passive=False,
            durable=True,
            auto_delete=False
        )

交换机通常分四种类型:

  1. fanout:将所有发送到该交换机的消息路由到所有与该交换机绑定的队列中;
  2. direct:将消息路由到 BindingKey 与 RoutingKey 完全一致的队列中;
  3. topic:将消息路由到 BindingKey 与 RoutingKey 匹配的队列中,匹配的规则包括 以 '.' 为分割、以 '*' 和 '#' 做模糊匹配;
  4. headers:该类型的交换机根据消息内容中国的 headers 属性进行匹配。

Queue

队列是 RabbitMQ 中用以存储消息的对象。多个消费者可以订阅同一个队列,而队列中的消息会被均摊到各个消费者,而不是每个消费者都收到所有的消息。

    def create_queue(self, exchange_name, queue_name, routing_key):
        self.channel.queue_declare(
            queue=queue_name,
        )
        self.queue_bind(exchange_name, queue_name, routing_key)

RoutingKey & BindingKey

RoutingKey 即路由键,通常是生产者发送消息时指定的。BindingKey 即绑定键,通常用于交换机与队列绑定。

二者通常配合起来使用,比如 direct 和 topic 类型的交换机在路由消息时,都是看这两个键是否匹配。某种情况下,RoutingKey 和 BindingKey 可以看做同一个东西。

    def queue_bind(self, exchange_name, queue_name, routing_key):
        self.channel.queue_bind(
            queue=queue_name,
            exchange=exchange_name,
            routing_key=routing_key,
        )

Publish/Subscribe 机制

RabbitMQ 消息的消费模式通常分为推模式和拉模式。

推模式采用的是订阅的方式,使用的是 basic_consume 方法 ;而拉模式采用的是从队列中获取消息的方式,使用的是 basic_get 方法。拉模式通常运用于获取单挑消息的场合,对于持续获取消息或者需要实现高吞吐量的场合,推模式更适合。

下面是一个推模式的例子:

def msg_consumer(channel, method_frame, header_frame, body):
    try:
        print("[Consumer] Receive message:")
        print("           {}: {}".format(method_frame.routing_key, body))
        time.sleep(1)
        channel.basic_ack(delivery_tag=method_frame.delivery_tag)
    except:
        print("[Consumer] Reject message and return it to queue!")
        channel.basic_nack(delivery_tag=method_frame.delivery_tag,
                           multiple=False, requeue=True)
    return


def msg_publisher(channel, *, exchange, routing_key):
    # Send a message
    data = "hahahahhahahahaha! I'm a bug and you can't catch me!"
    if channel.basic_publish(
            exchange=exchange,
            routing_key=routing_key,
            body=data,
            properties=pika.BasicProperties(
                content_type='text/plain',
                delivery_mode=1),
            mandatory=True):
        print('[Producer] Message was published')
    else:
        print('[Producer] Message was returned')
        
if __name__ == "__main__":
    client = MqClient(
        mq_host="172.16.110.17",
        mq_port=5672,
        username="guest",
        password="guest",
    )
    channel = client.get_channel()

    # 设置生产者
    msg_publisher(channel,
                  exchange=EXCHANGE_NAME,
                  routing_key="hdls.miao.message")
    # 设置消费者
    channel.basic_consume(
        msg_consumer,
        queue=QUEUE_NAME,
        no_ack=False,
        consumer_tag="hdls-consumer"
    )
    # 开始消费
    channel.start_consuming()
    try:
        channel.start_consuming()
    except KeyboardInterrupt:
        channel.stop_consuming()
    client.connection.close()

消费者收到消息后进行收到 ack,得到的结果如下:

[Producer] Message was published
[Consumer] Receive message:
           hdls.miao.message: b"hahahahhahahahaha! I'm a bug and you can't catch me!"

若对于该条消息,消费者不消费,而是拒绝:basic_nack,而拒绝的同时将参数设置为 requeue=True,即将消息打回队列,则得到的结果如下:

[Producer] Message was published
[Consumer] Receive message:
           hdls.miao.message: b"hahahahhahahahaha! I'm a bug and you can't catch me!"
[Consumer] Reject message and return it to queue!
[Consumer] Receive message:
           hdls.miao.message: b"hahahahhahahahaha! I'm a bug and you can't catch me!"
[Consumer] Reject message and return it to queue!
[Consumer] Receive message:
           hdls.miao.message: b"hahahahhahahahaha! I'm a bug and you can't catch me!"
[Consumer] Reject message and return it to queue!
[Consumer] Receive message:
           hdls.miao.message: b"hahahahhahahahaha! I'm a bug and you can't catch me!"
[Consumer] Reject message and return it to queue!
...

此时该条消息就会一直被打回队列,就一直堵在队列中:

死信

当一个消息被拒绝而被打回队列,而此后该消息没有消费者接收,成了死信,就会堵住队列,当队列中死信越来越多时,队列的性能会受到影响。对于死信的处理,设置死信队列是个很好的选择。

死信通常有下面几种情况:

  1. 消息被拒绝(通过 basic.reject 方法或 basic.nack 方法),同时被打回队列;
  2. 消息本身设置了 TTL 或队列设置了 TTL,且达到了过期时间;
  3. 队列可持有消息数量达到了上限。

死信交换机

当消息在一个队列中成为死信时,就能够被发送到另一个交换机中,也就是死信交换机。死信交换机其实就是普通的交换机,不过绑定的是死信队列,其声明和使用与普通交换机一致。

死信队列

死信队列就是用来接收死信的队列,但其本质与普通队列一样。只不过在设置普通队列的时候需要给其定义死信交换机是哪个,当消息成为死信时,以什么样的 routing_key 来路由到死信队列里去。这样所有的死信就可以被路由到对应的死信队列中去了。

需要注意的是,在声明普通队列的死信设置之前,死信交换机和死信队列需要先存在。

根据定义将上面的普通队列做修改:

    def create_queue(self, exchange_name, queue_name, routing_key,
                     is_dead=False):
        arguments = {}
        if not is_dead:
            arguments = {
                "x-dead-letter-exchange": DEAD_EXCHANGE_NAME,
                "x-dead-letter-routing-key": DEAD_ROUTING_KEY,
            }
        self.channel.queue_declare(
            queue=queue_name,
            arguments=arguments,
        )
        self.queue_bind(exchange_name, queue_name, routing_key)

在声明队列时,需要声明两个参数即可:x-dead-letter-exchangex-dead-letter-routing-key。同时在声明普通队列之前声明死信队列:

    def connect(self):
        self.create_exchange(DEAD_EXCHANGE_NAME)
        self.create_queue(DEAD_EXCHANGE_NAME,
                          DEAD_QUEUE_NAME, DEAD_ROUTING_KEY, is_dead=True)

        self.create_exchange(EXCHANGE_NAME)
        self.create_queue(EXCHANGE_NAME, QUEUE_NAME, ROUTING_KEY)

如果同时加上死信队列的消费者,就可以统一处理死信了:

def dead_msg_consumer(channel, method_frame, header_frame, body):
    try:
        print("[DEAD CSM] Dead Message! It's time to put it to dead queue.")
        print("           {}: {}".format(method_frame.routing_key, body))
        print("           ACK dead message!")
        channel.basic_ack(delivery_tag=method_frame.delivery_tag)
    except:
        channel.basic_nack(delivery_tag=method_frame.delivery_tag,
                           multiple=False, requeue=False)
    return

万事俱备!但运行后居然报错:

pika.exceptions.ChannelClosed: (406, "PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'hdls.mq.queue' in vhost '/': received the value 'dead.exchange' of type 'longstr' but current is none")

这是因为之前我们已经声明过不加死信设置的队列了,声明 queue 时试图设定一个 x-dead-letter-exchange 参数,当前服务器上该 queue 的该参数为 none,服务器不允许所以报错。

此时有两种解决方法:一是在服务器上将之前的 queue 删除,加上死信参数,再次声明队列;二是通过 policy 来设置这个参数。

policy 可以用 rabbitmqctl set_policy设置,也可以在 RabbitMQ 的前端页面进行:

需要注意的是,通过 policy 方法设置时参数为 dead-letter-exchange dead-letter-routing-key,而用第一种方法中的死信参数需要加上 x- 前缀。

将死信设置加上后,再次启用 consumer:

[Publisher] Message was published
[Consumer] Receive message:
           hdls.miao.message: b"hahahahhahahahaha! I'm a bug and you can't catch me!"
[Consumer] Reject message and return it to queue!
[DEAD CSM] Dead Message! It's time to put it to dead queue.
           dead.message: b"hahahahhahahahaha! I'm a bug and you can't catch me!"
           ACK dead message!

可以看到,消息在打回队列后就被路由到了死信队列。

延迟队列

所谓延迟队列,指的是消息发送后,并不想立即被消费者拿到,希望在指定时间后,消费者才拿到消息。

延迟队列可以用死信队列来实现。利用队列或消息的 TTL 特性,可以做到消息在指定时间内超时后被路由到死信队列,而此时死信队列就可以当做延迟队列来做消息处理。

    def create_queue(self, exchange_name, queue_name, routing_key,
                     is_dead=False):
        arguments = {}
        if not is_dead:
            arguments = {
                "x-message-ttl": 3000,
                "x-dead-letter-exchange": DEAD_EXCHANGE_NAME,
                "x-dead-letter-routing-key": DEAD_ROUTING_KEY,
            }
        self.channel.queue_declare(
            queue=queue_name,
            arguments=arguments,
        )
        self.queue_bind(exchange_name, queue_name, routing_key)

在普通队列的死信设置里加上一条 x-message-ttl 就可以设置消息的 TTL。

[Publisher] Message was published
[Consumer] Receive message:
           hdls.miao.message: b"I'm not bug, but you can only receive me in 3 seconds."
[Consumer] Reject message and return it to queue!
[Consumer] Receive message:
           hdls.miao.message: b"I'm not bug, but you can only receive me in 3 seconds."
[Consumer] Reject message and return it to queue!
[Consumer] Receive message:
           hdls.miao.message: b"I'm not bug, but you can only receive me in 3 seconds."
[Consumer] Reject message and return it to queue!
[DELAY CSM] Delay queue receive the message!
            dead.message: b"I'm not bug, but you can only receive me in 3 seconds."
            ACK delay message!
2019/3/28 posted in  中间件

Kubernetes Job 与 CronJob

如果说 Deployment、DaemonSet 等资源为 Kubernetes 承担了长时间、在线计算的能力,那么定时、短期、甚至一次性的离线计算能力,便是 Job 和 CronJob 所承担的事情。

Job

Job 其实就是根据定义起一个或多个 pod 来执行任务,pod 执行完退出后,这个 Job 就完成了。所以 Job 又称为 Batch Job ,即计算业务或离线业务。

Job 使用方法

Job 的 YAML 定义与 Deployment 十分相似。与 Deployment 不同的是,Job 不需要定义 spec.selector 来指定需要控制的 pod,看个例子:

apiVersion: batch/v1
kind: Job
metadata:
  name: date
spec:
  template:
    spec:
      containers:
      - name: date
        image: ubuntu:16.04
        command: ["sh", "-c", "date > /date/date.txt"]
        volumeMounts:
        - mountPath: /date
          name: date-volume
      restartPolicy: Never
      volumes:
      - name: date-volume
        hostPath:
          path: /date

在这个 Job 中,我们定义了一个 Ubuntu 镜像的容器,用于将当前时间输出至宿主机的 /date/date.txt 文件中。将此 Job 创建好后,我们可以查看该 Job 对象:

可以看到,Job 在创建后被加上了 controller-uid=***** 的 Label,和与之对应的 Label Selector,从而保证了 Job 与它所管理的 Pod 之间的匹配关系。查看 pod 可以看到相同的 Label:

pod 在执行完毕后,状态会变成 Completed,我们可以去 pod 被调度的 node 上查看我们挂载进去的 date.txt 文件:

[root@rancher-node3 ~]# cat /date/date.txt
Sat Dec 22 16:09:48 UTC 2018

pod 重启策略

在 Job 中,pod 的重启策略 restartPolicy 不允许被设置成 Always,只允许被设置为 Never 或 OnFailure。这是因为 Job 的 pod 执行完毕后直接退出,如果 restartPolicy=Always,pod 将不断执行计算作业,这可不是我们期望的。

Job 可以设置 pod 的最长运行时间 spec.activeDeadlineSeconds,一旦超过了这个时间,这个 Job 的所有 pod 都会被终止。

那么,如果 pod 的计算作业失败了,在不同的重启策略下会怎么办?

restartPolicy=Never

如果设置了 restartPolicy=Never,那么 Job Controller 会不断的尝试创建一个新的 pod 出来,默认尝试 6 次。当然这个值可以设置,即 Job 对象的 spec.backoffLimit 字段。

需要注意的是,重新创建 Pod 的间隔是呈指数增加的。

restartPolicy=OnFailure

如果设置了 restartPolicy=Never,那么 Job Controller 会不断的重启这个 pod。

Job 工作原理

通过观察 Job 的创建过程,不难看出 Job 维护了两个值 DESIRED 和 SUCCESSFUL,分别表示 spec.completions 和 成功退出的 pod 数。

而在 Job 对象中有两个参数意义重大,它们控制着 Job 的并行任务:
spec.parallelism :定义一个 Job 在任意时间最多可以启动同时运行的 Pod 数;
spec.completions :定义 Job 至少要完成的 Pod 数目,即 Job 的最小完成数。

弄清楚了这两个参数,我们再来看 Job 的工作原理。

首先,Job Controller 控制的直接就是 pod;
在整个 Job 的作业过程中,Job Controller 根据实际在 Running 的 pod 数、已成功退出的 pod 数、parallelism 值、completions 值,计算出当前需要创建或删除的 pod 数,去调用 APIServer 来执行具体操作。

就拿上面的例子说明,比如将 YAML 改成:

apiVersion: batch/v1
kind: Job
metadata:
  name: date
spec:
  parallelism: 2
  completions: 3
  template:
    spec:
      containers:
      - name: date
        image: ubuntu:16.04
        command: ["sh", "-c", "date >> /date/date.txt"]
        volumeMounts:
        - mountPath: /date
          name: date-volume
      restartPolicy: Never
      volumes:
      - name: date-volume
        hostPath:
          path: /date

第一步:判断当前没有 pod 在 Running,且成功退出 pod 数为 0,当前最多允许 2 个 pod 并行。向 APIServer 发起创建 2 个 pod 的请求。此时 2 个 pod Running,当这 2 个 pod 完成任务并成功退出后,进入第二步;

第二步:当前 Running pod 数为 0,成功退出数为 2,当前最多允许 2 个 pod 并行,Job 最小完成数为 3。则向 APIServer 发起创建 1 个 pod 的请求。此时 1 个 pod Running,当这个 pod 完成任务并成功退出后,进入第三步;

第三步:当前成功退出 pod 数为 3,Job 最小完成数为 3。判断 Job 完成作业。

批处理调度

根据 Job 的这些特性,我们就可以用以实现批处理调度,也就是并行启动多个计算进程去处理一批工作项。根据并行处理的特性,往往将 Job 分为三种类型,即 Job 模板拓展、固定 completions 数的 Job、固定 parallelism 数的 Job。

Job 模板拓展

这种模式最简单粗暴,即将 Job 的 YAML 定义成外界可使用的模板,再由外部控制器使用这些模板来生成单一无并行任务的 Job。比如,我们将上面的例子改写成模板:

apiVersion: batch/v1
kind: Job
metadata:
  name: date-$ITEM
spec:
  template:
    spec:
      containers:
      - name: date
        image: ubuntu:16.04
        command: ["sh", "-c", "echo item number $ITEM; date >> /date/date.txt; sleep 5s"]
        volumeMounts:
        - mountPath: /date
          name: date-volume
      restartPolicy: Never
      volumes:
      - name: date-volume
        hostPath:
          path: /date

而在使用的时候,只需将 $ITEM 替换掉即可:

cat job.yml | sed "s/\$ITEM/1/" > ./job-test.yaml

除了上面这张简单的基础模板使用,Kubernetes 官网还提供了一种以 jinja2 模板语言实现的多模板参数的模式:

{%- set params = [{ "name": "apple", "url": "http://www.orangepippin.com/apples", },
                  { "name": "banana", "url": "https://en.wikipedia.org/wiki/Banana", },
                  { "name": "raspberry", "url": "https://www.raspberrypi.org/" }]
%}
{%- for p in params %}
{%- set name = p["name"] %}
{%- set url = p["url"] %}
apiVersion: batch/v1
kind: Job
metadata:
  name: jobexample-{{ name }}
  labels:
    jobgroup: jobexample
spec:
  template:
    metadata:
      name: jobexample
      labels:
        jobgroup: jobexample
    spec:
      containers:
      - name: c
        image: busybox
        command: ["sh", "-c", "echo Processing URL {{ url }} && sleep 5"]
      restartPolicy: Never
---
{%- endfor %}

在使用这种模式需要确保已经安装了 jinja2 的包:pip install --user jinja2

再执行一条 Python 命令即可替换:

alias render_template='python -c "from jinja2 import Template; import sys; print(Template(sys.stdin.read()).render());"'
cat job.yaml.jinja2 | render_template > jobs.yaml

或者直接进行 kubectl create:

alias render_template='python -c "from jinja2 import Template; import sys; print(Template(sys.stdin.read()).render());"'
cat job.yaml.jinja2 | render_template | kubectl create -f -

固定 completions 数的 Job

这种模式就真正实现了并行工作模式,且 Job 的完成数是固定的。

在这种模式下,需要一个存放 work item 的队列,比如 RabbitMQ,我们需要先将要处理的任务变成 work item 放入任务队列。每个 pod 创建时,去队列里获取一个 task,完成后将其从队列里删除,直到完成了定义的 completions 数。

上图描述了一个 completions=6,parallelism=2 的 Job 的示意图。选择 RabbitMQ 来充当这里的工作队列;外部生产者产生 6 个 task ,放入工作队列中;在 pod 模板中定义 BROKER_URL,来作为消费者。一旦创建了这个 Job,就会以并发度为 2 的方式,去消费这些 task,直到任务全部完成。其 yaml 文件如下:

apiVersion: batch/v1
kind: Job
metadata:
  name: job-wq-1
spec:
  completions: 6
  parallelism: 2
  template:
    metadata:
      name: job-wq-1
    spec:
      containers:
      - name: c
        image: myrepo/job-wq-1
        env:
        - name: BROKER_URL
          value: amqp://guest:guest@rabbitmq-service:5672
        - name: QUEUE
          value: job1
  restartPolicy: OnFailure

固定 parallelism 数的 Job

最后一种模式是指定并行度(parallelism),但不设置固定的 completions 的值。

每个 pod 去队列里拿任务执行,完成后继续去队列里拿任务,直到队列里没有任务,pod 才退出。这种情况下,只要有一个 pod 成功退出,就意味着整个 Job 结束。这种模式对应的是任务总数不固定的场景。

上图描述的是一个并行度为 2 的 Job。RabbitMQ 不能让客户端知道是否没有数据,因此这里采用 Redis 队列;每个 pod 去队列里消费一个又一个任务,直到队列为空后退出。其对应的 yaml 文件如下:

apiVersion: batch/v1
kind: Job
metadata:
  name: job-wq-2
spec:
  parallelism: 2
  template:
    metadata:
      name: job-wq-2
    spec:
      containers:
      - name: c
        image: myrepo/job-wq-2
  restartPolicy: OnFailure

CronJob

Kubernetes 在 v1.5 开始引入了 CronJob 对象,顾名思义,就是定时任务,类似 Linux Cron。先看个例子:

apiVersion: batch/v1beta1
kind: CronJob
metadata:
  name: cron-date
spec:
  schedule: "*/1 * * * *"
  jobTemplate:
    spec:
      template:
        spec:
          containers:
          - name: date
            image: ubuntu:16.04
            command: ["sh", "-c", "date >> /date/date.txt"]
            volumeMounts:
            - mountPath: /date
              name: date-volume
          nodeSelector:
            kubernetes.io/hostname: rancher-node3
          volumes:
          - name: date-volume
            hostPath:
              path: /date
          restartPolicy: OnFailure

CronJob 其实就是一个 Job 对象的控制器,需要定义一个 Job 的模板,即 jobTemplate 字段;另外,其定时表达式 schedule 基本上照搬了 Linux Cron 的表达式:

# ┌───────────── minute (0 - 59)
# │ ┌───────────── hour (0 - 23)
# │ │ ┌───────────── day of the month (1 - 31)
# │ │ │ ┌───────────── month (1 - 12)
# │ │ │ │ ┌───────────── day of the week (0 - 6) (Sunday to Saturday;
# │ │ │ │ │                                   7 is also Sunday on some systems)
# │ │ │ │ │
# │ │ │ │ │
# * * * * * 

创建出该 CronJob 对象后,CronJob 会记录下最近一次 Job 的执行时间:

[root@rancher-node1 jobs]# kubectl get cronjob cron-date
NAME        SCHEDULE      SUSPEND   ACTIVE   LAST SCHEDULE   AGE
cron-date   */1 * * * *   False     0        22s             15m
[root@rancher-node1 jobs]# kubectl get job
NAME                   DESIRED   SUCCESSFUL   AGE
cron-date-1545584220   1         1            2m
cron-date-1545584280   1         1            1m
cron-date-1545584340   1         1            23s
[root@rancher-node1 jobs]# kubectl get po
NAME                         READY   STATUS      RESTARTS   AGE
cron-date-1545584220-gzmzw   0/1     Completed   0          2m
cron-date-1545584280-bq9nx   0/1     Completed   0          1m
cron-date-1545584340-84tf2   0/1     Completed   0          27s

如果某些定时任务比较特殊,某个 Job 还没有执行完,下一个新的 Job 就产生了。这种情况可以通过设置 spec.concurrencyPolicy 字段来定义具体策略:

  1. concurrencyPolicy=Allow,这也是默认情况,这意味着这些 Job 可以同时存在;
  2. concurrencyPolicy=Forbid,这意味着不会创建新的 Pod,该创建周期被跳过;
  3. concurrencyPolicy=Replace,这意味着新产生的 Job 会替换旧的、没有执行完的 Job。

Kubernetes 所能容忍的 Job 创建失败数为 100,但是其失败时间窗口可以自定义。即通过字段 spec.startingDeadlineSeconds 可以用来设定这个时间窗口,单位为秒,也就是说在这个时间窗口内最大容忍数为 100,如果超过了 100,这个 Job 就不会再被执行。

2018/12/22 posted in  Kubernetes

Kubernetes 安全机制解读

在 Kubernetes 中,所有资源的访问和变更都是围绕 APIServer 展开的。比如说 kubectl 命令、客户端 HTTP RESTFUL 请求,都是去 call APIServer 的 API 进行的,本文就重点解读 k8s 为了集群安全,都做了些什么。

首先,Kubernetes 官方文档给出了上面这张图。描述了用户在访问或变更资源的之前,需要经过 APIServer 的认证机制、授权机制以及准入控制机制。这三个机制可以这样理解,先检查是否合法用户,再检查该请求的行为是否有权限,最后做进一步的验证或添加默认参数。

用户

Kubernetes 中有两种用户,一种是内置“用户” ServiceAccount,另一种我称之为自然人。

所谓自然人就是指区别于 pod 等资源概念的“人”,可以理解成实际操作 "kubectl" 命令的人。admin 可以分发私钥,但自然人可以储存类似 KeyStone 甚至包含账号密码的文件,所以 k8s 中没有对自然人以 API 对象描述之。

在典型的 Kubernetes 集群中,API 通常服务在 443 端口,APIServer 提供自签名证书。当你使用 kube-up.sh 创建集群用户时,证书会自动在 $USER/.kube/config 中创建出来,而后续用 kubectl 命令访问 APIServer 时,都是用这个证书。

与之相反,k8s 中以 API 对象的形式描述和管理 ServiceAccount。它们被绑定在某个具体的 namespace 中,可以由 APIServer 自动创建出来或手动 call k8s API。

认证机制(Authentication)

k8s 中的认证机制,是在用户访问 APIServer 的第一步。通常是一个完整的 HTTP 请求打过来,但是这一步往往只检测请求头或客户端证书。

认证机制目前有客户端证书、bearer tokens、authenticating proxy、HTTP basic auth 这几种模式。使用方式通常有以下几种:

  1. X509 Client Certs: 客户端证书模式需要在 kubectl 命令中加入 --client-ca-file=<SOMEFILE> 参数,指明证书所在位置。

  2. Static Token File: --token-auth-file=<SOMEFILE> 参数指明 bearer tokens 所在位置。

  3. bearer tokens: 在 HTTP 请求头中加入 Authorization: Bearer <TOKEN>

  4. Bootstrap Tokens: 与 bearer tokens 一致,但 TOKEN 格式为 [a-z0-9]{6}.[a-z0-9]{16}。该方式称为 dynamically-managed Bearer token,以 secret 的方式保存在 kube-system namespace 中,可以被动态的创建和管理。同时,启用这种方式还需要在 APIServer 中打开 --enable-bootstrap-token-auth ,这种方式还处于 alpha 阶段。

  5. Static Password File: 以参数 --basic-auth-file=<SOMEFILE> 指明 basic auth file 的位置。这个 basic auth file 以 csv 文件的形式存在,里面至少包含三个信息:password、username、user id,同时该模式在使用时需要在请求头中加入 Authorization: Basic BASE64ENCODED(USER:PASSWORD)

  6. Service Account Tokens: 该方式通常被 pod 所使用,在 PodSpec 中指明 ServiceAccount 来访问 ApiServer。

除了以上列出来的几种方式外,还有一些比较特殊的访问方式,这里不再详细解读。

授权机制(Authorization)

当用户通过认证后,k8s 的授权机制将对用户的行为等进行授权检查。换句话说,就是对这个请求本身,是否对某资源、某 namespace、某操作有权限限制。

授权机制目前有 4 种模式:RBAC、ABAC、Node、Webhook。下面对这 4 种模式分别做分析。

RBAC

Role-based access control (RBAC) 是基于角色的权限访问控制,通常是对于“内置用户”而言的。该模式是在 k8s v1.6 开发出来的。若要开启该模式,需要在 APIServer 启动时,设置参数 --authorization-mode=RBAC

RBAC 所使用的 API Group 是 rbac.authorization.k8s.io/v1beta1,直到 Kubernetes v1.8 后,RBAC 模块达到稳定水平,所使用的 API Group 为 rbac.authorization.k8s.io/v1

所谓基于角色的权限访问控制,就是对某个用户赋予某个角色,而这个角色通常决定了对哪些资源拥有怎样的权限。

ServiceAccount

首先来看看这个 “内置用户”,在大多时候我们都不使用 “自然人” 这个功能,而是使用 ServiceAccount,再对其他资源授予某个 ServiceAccount,就使得其能够以 “内置用户” 的身份去访问 APIServer。

创建一个 ServiceAccount 很简单,只需要指定其所在 namespace 和 name 即可。举个例子:

apiVersion: v1
kind: ServiceAccount
metadata:
  namespace: hdls
  name: hdls-sa

Role & Rolebinding

RBAC 中最重要的概念就是 RoleRoleBindingRole 定义了一组对 Kubernetes API 对象的操作权限,而 RoleBinding 则定义的是具体的 ServiceAccount 和 Role 的对应关系。

举个 Role 的例子如下:

kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
    namespace: hdls
    name: hdls-role
rules:
- apiGroups: [""]
  resources: ["pods"]
  verbs: ["get", "list"]

其中:
namespace: 在这里仅限于逻辑上的“隔离”,并不会提供任何实际的隔离或者多租户能力;
rules:定义的是权限规则,允许“被作用者”,对 hdls 下面的 Pod 对象,进行 GET 和 LIST 操作;
apiGroups:为 "" 代表 core API Group;
resources:指的是资源类型,对此还可以进行详细的划分,指定可以操作的资源的名字,比如:

rules:
- apiGroups: [""]
  resources: ["configmaps"]
  resourceNames: ["my-config"]
  verbs: ["get"]

verbs: 指的是具体的操作,当前 Kubernetes(v1.11)里能够对 API 对象进行的所有操作有 "get", "list", "watch", "create", "update", "patch", "delete"。

再看 RoleBinding 的例子:

kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
    name: hdls-rolebinding
    namespace: hdls
subjects:
- kind: ServiceAccount
    name: hdls-sa
    apiGroup: rbac.authorization.k8s.io
roleRef:
    kind: Role
    name: hdls-role
    apiGroup: rbac.authorization.k8s.io

可以看到,这个 RoleBinding 对象里定义了一个 subjects 字段,即“被作用者”。它的类型是 ServiceAccount,就是上面创建的 sa。这个 subjects 还可以是 User 和 Group,User 是指 k8s 里的用户,而 Group 是指 ServiceAccounts。

roleRef 字段是用来直接通过名字,引用我们前面定义的 Role 对象(hdls-role),从而定义了 Subject 和 Role 之间的绑定关系。

此时,我们再用 kubectl get sa -n hdls -o yaml 命令查看之前的 ServiceAccount,就可以看到 ServiceAccount.secret,这是因为 k8s 会为一个 ServiceAccount 自动创建并分配一个 Secret 对象,而这个 Secret 就是用来跟 APIServer 进行交互的授权文件: TokenToken 文件的内容一般是证书或者密码,以一个 Secret 对象的方式保存在 etcd 当中。

这个时候,我们在我们的 Pod 的 YAML 文件中定义字段 .spec.serviceAccountName 为上面的 ServiceAccount name 即可声明使用。

如果一个 Pod 没有声明 serviceAccountName,Kubernetes 会自动在它的 Namespace 下创建一个名叫 default 的默认 ServiceAccount,然后分配给这个 Pod。然而这个默认 ServiceAccount 并没有关联任何 Role。也就是说,此时它有访问 APIServer 的绝大多数权限。

ClusterRole & ClusterRoleBinding

需要注意的是 Role 和 RoleBinding 对象都是 Namespaced 对象,它们只对自己的 Namespace 内的资源有效。

而某个 Role 需要对于非 Namespaced 对象(比如:Node),或者想要作用于所有的 Namespace 的时候,我们需要使用 ClusterRole 和 ClusterRoleBinding 去做授权。

这两个 API 对象的用法跟 Role 和 RoleBinding 完全一样。只不过,它们的定义里,没有了 Namespace 字段。

值得一提的是,Kubernetes 已经内置了很多个为系统保留的 ClusterRole,它们的名字都以 system: 开头。一般来说,这些系统级别的 ClusterRole,是绑定给 Kubernetes 系统组件对应的 ServiceAccount 使用的。

除此之外,Kubernetes 还提供了四个内置的 ClusterRole 来供用户直接使用:

cluster-admin:整个集群的最高权限。如果在 ClusterRoleBinding 中使用,意味着在这个集群中的所有 namespace 中的所有资源都拥有最高权限,为所欲为;如果在 RoleBinding 中使用,即在某个 namespace 中为所欲为。

admin:管理员权限。如果在 RoleBinding 中使用,意味着在某个 namespace 中,对大部分资源拥有读写权限,包括创建 Role 和 RoleBinding 的权限,但没有对资源 quota 和 namespace 本身的写权限。

edit:写权限。在某个 namespace 中,拥有对大部分资源的读写权限,但没有对 Role 和 RoleBinding 的读写权限。

view:读权限。在某个 namespace 中,仅拥有对大部分资源的读权限,没有对 Role 和 RoleBinding 的读权限,也没有对 seccrets 的读权限。

Aggregated ClusterRoles

在 Kubernetes v1.9 之后,ClusterRole 有一种新的定义方法,就是使用 aggregationRule 将多个 ClusterRole 合成一个新的 ClusterRole

首先看个 k8s 官网的例子:

kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: monitoring
aggregationRule:
  clusterRoleSelectors:
  - matchLabels:
      rbac.example.com/aggregate-to-monitoring: "true"
rules: []

其中 rules 字段不必定义,会被 controller manager 自动填充。

可以看出 aggregationRule 就是将所有满足 label 条件的 ClusterRole 的合成一个 ClusterRole,而这个新的 ClusterRole 权限为其他总和。

Group

相对于 User 而言,k8s 还拥有“用户组”(Group)的概念,也就是一组“用户”的意思。而对于“内置用户” ServiceAccount 来说,“用户组”的概念也同样适用。

实际上,一个 ServiceAccount,在 Kubernetes 里对应的“用户”的名字是: system:serviceaccount:<ServiceAccount 名字 > ;而它对应的内置“用户组”的名字,就是 system:serviceaccounts:<Namespace 名字 >

对于 Group 的运用,我们举个例子,在 RoleBinding 里这样定义 subjects:

subjects:
- kind: Group
    name: system:serviceaccounts:hdls
    apiGroup: rbac.authorization.k8s.io

这就意味着这个 Role 的权限规则,作用于 hdls 里的所有 ServiceAccount。

而如果 Group 不指定 Namespace,即直接定义为 system:serviceaccounts,意味着作用于整个系统里的所有 ServiceAccount。

ABAC

Attribute-based access control (ABAC) 是基于属性的权限访问控制。若要开启该模式,需要在 APIServer 启动时,开启 --authorization-policy-file=<SOME_FILENAME>--authorization-mode=ABAC 两个参数。

其 policy 文件用来指定权限规则,必须满足每行都是一个 json 对象的格式。可以指定 user 或 group 为某个特定的对象,并描述其拥有的权限。

与 Yaml 文件一致,必须描述的属性有 apiVersion、kind、spec,而 spec 里描述了具体的用户、资源和行为。看个例子:

{"apiVersion": "abac.authorization.kubernetes.io/v1beta1", "kind": "Policy", "spec": {"user": "bob", "namespace": "projectCaribou", "resource": "pods", "readonly": true}}

这就描述了用户 bob 只有在 namespace projectCaribou 下对 pod 的读权限。类似的,这个 User 可以是某个人,也可以是 kubelet 或者某个 ServiceAccount,这里 ServiceAccount 需要写全,比如:system:serviceaccount:kube-system:default

如果是描述某个 namespace 下的所有人,需要用到 group,比如:

{"apiVersion": "abac.authorization.kubernetes.io/v1beta1", "kind": "Policy", "spec": {"group": "system:serviceaccounts:default", "readonly": true, "resource": "pods"}}

Node

Node 授权机制是一种特殊的模式,是 kubelet 发起的请求授权。开启该模式,需要开启参数 --authorization-mode=Node

通过启动 --enable-admission-plugins=...,NodeRestriction,...,来限制 kubelet 访问 node,endpoint、pod、service以及secret、configmap、PV 和 PVC 等相关的资源。

Webhook

Webhook 模式是一种 HTTP 回调模式,是一种通过 HTTP POST 方式实现的简单事件通知。该模式需要 APIServer 配置参数 –authorization-webhook-config-file=<SOME_FILENAME>,HTTP 配置文件的格式跟 kubeconfig 的格式类似。

# Kubernetes API version
apiVersion: v1
# kind of the API object
kind: Config
# clusters refers to the remote service.
clusters:
  - name: name-of-remote-authz-service
    cluster:
      # CA for verifying the remote service.
      certificate-authority: /path/to/ca.pem
      # URL of remote service to query. Must use 'https'. May not include parameters.
      server: https://authz.example.com/authorize

# users refers to the API Server's webhook configuration.
users:
  - name: name-of-api-server
    user:
      client-certificate: /path/to/cert.pem # cert for the webhook plugin to use
      client-key: /path/to/key.pem          # key matching the cert

# kubeconfig files require a context. Provide one for the API Server.
current-context: webhook
contexts:
- context:
    cluster: name-of-remote-authz-service
    user: name-of-api-server
  name: webhook

其中,Cluster 指需要回调的地方的客户端,指定其访问证书和 URL;user 指回调处访问的身份,指明其所需证书和 key;contexts 指回调的内容。

准入控制(Admission Controllers)

在一个请求通过了认证机制和授权认证后,需要经过最后一层筛查,即准入控制。这个准入控制模块的代码通常在 APIServer 中,并被编译到二进制文件中被执行。这一层安全检查的意义在于,检查该请求是否达到系统的门槛,即是否满足系统的默认设置,并添加默认参数。

准入控制以插件的形式存在,开启的方式为:
kube-apiserver --enable-admission-plugins=NamespaceLifecycle,LimitRanger ...

关闭的方式为:
kube-apiserver --disable-admission-plugins=PodNodeSelector,AlwaysDeny ...

常用的准入控制插件有:

  • AlwaysAdmit:允许所有请求通过,被官方反对,因为没有实际意义;
  • AlwaysPullImages:将每个 pod 的 image pull policy 改为 always,在多租户的集群被使用;
  • AlwaysDeny:禁止所有请求通过,被官方反对,因为没有实际意义;
  • DefaultStorageClass:为每个 PersistentVolumeClaim 创建默认的 PV;
  • DefaultTolerationSeconds:如果 pod 对污点 node.kubernetes.io/not-ready:NoExecutenode.alpha.kubernetes.io/unreachable:NoExecute 没有容忍,为其创建默认的 5 分钟容忍 notready:NoExecuteunreachable:NoExecute
  • LimitRanger:确保每个请求都没有超过其 namespace 下的 LimitRange,如果在 Deployment 中使用了 LimitRange 对象,该准入控制插件必须开启;
  • NamespaceAutoProvision:检查请求中对应的 namespace 是否存在,若不存在自动创建;
  • NamespaceExists:检查请求中对应的 namespace 是否存在,若不存在拒绝该请求;
  • NamespaceLifecycle:保证被删除的 namespace 中不会创建新的资源;
  • NodeRestriction:不允许 kubelet 修改 Node 和 Pod 对象;
  • PodNodeSelector:通过读取 namespace 的注解和全局配置,来控制某 namespace 下哪些 label 选择器可被使用;
  • PodPreset:满足预先设置的标准的 pod 不允许被创建;
  • Priority:通过 priorityClassName 来决定优先级;
  • ResourceQuota:保证 namespace 下的资源配额;
  • ServiceAccount:保证 ServiceAccount 的自动创建,如果用到 ServiceAccount,建议开启;

以上只列举了部分,详情请移步 Kubernetes 官方文档。

官方建议:

  • 版本 > v1.10:
--enable-admission-plugins=NamespaceLifecycle,LimitRanger,ServiceAccount,DefaultStorageClass,DefaultTolerationSeconds,MutatingAdmissionWebhook,ValidatingAdmissionWebhook,ResourceQuota
  • v1.9
--admission-control=NamespaceLifecycle,LimitRanger,ServiceAccount,DefaultStorageClass,DefaultTolerationSeconds,MutatingAdmissionWebhook,ValidatingAdmissionWebhook,ResourceQuota
  • v1.6 - v1.8
--admission-control=NamespaceLifecycle,LimitRanger,ServiceAccount,PersistentVolumeLabel,DefaultStorageClass,ResourceQuota,DefaultTolerationSeconds
  • v1.4 - v1.5
--admission-control=NamespaceLifecycle,LimitRanger,ServiceAccount,DefaultStorageClass,ResourceQuota
2018/11/26 posted in  Kubernetes

Kubernetes 入门之网络详解

Service 是 k8s 网络部分的核心概念,在 k8s 中,Service 主要担任了四层负载均衡的职责。本文从负载均衡、外网访问、DNS 服务的搭建及 Ingress 七层路由机制等方面,讲解 k8s 的网络相关原理。

Service 详解

Service 是主要用来实现应用程序对外提供服务的机制。

如上图所示,Service 是对 Pod 的一层抽象,主要通过 TCP/IP 机制及监听 IP 和端口号来对外提供服务。与 Pod 不同的是,Service 一旦创建,系统会为其分发一个 ClusterIP (也可以自己指定),且在其生命周期内不会发生变化。

Service 的创建

命令行快速创建

在创建好 RC 后,可以通过命令行 kubectl expose 来快速创建一个对应的 Service 。比如现已有一个名为 hdls 的 rc:

kubectl expose rc hdls

这种方式创建出来的 Service,其 ClusterIP 是系统自动为其分配的,而 Service 的端口号是从 Pod 中的 containerPort 复制而来。

通过 YAML 创建

apiVersion: v1
kind: Service
metadata:
  name: hdls
spec:
  ports:
  - port: 8080   # Service 的虚拟端口
    targetPort: 8000  # 指定后端 Pod 的端口号
  selector:  # Label 选择器
    app: hdls

定义好 YAML 文件后,通过命令 kubectl create -f <service.yml> 即可创建。Service 的定义需要指定以下几个关键字段:

  • ports
    • port: Service 的虚拟端口
    • targetPort: 后端 Pod 的端口号,若不填则默认与 Service 的端口一致
  • selector: Label 选择器,指定后端 Pod 所拥有的 Label

负载分发策略

k8s 提供了两种负载分发策略:

  • RoundRobin:轮询方式。即轮询将请求转发到后端的各个 Pod 上。
  • SessionAffinity:基于客户端 IP 地址进行会话保持模式。即相同 IP 的客户端发起的请求被转发到相同的 Pod 上。

在默认情况下,k8s 采用轮询模式进行路由选择,但我们也可以通过将 service.spec.SessionAffinity 设置为 “ClusterIP” 来启用 SessionAffinity 模式。

一些特殊情况

Headless Service

在这种情况下,k8s 通过 Headless Service 的概念来实现,即不给 Service 设置 ClusterIP (无入口 IP),仅通过 Label Selector 将后端的 Pod 列表返回给调用的客户端。

apiVersion: v1
kind: Service
metadata:
  name: hdls
spec:
  ports:
  - port: 8080   
    targetPort: 8000 
  clusterIP: None
  selector: 
    app: hdls

该 Service 没有虚拟的 ClusterIP ,对其访问可以获得所有具有 app=hdls 的 Pod 列表,客户端需要实现自己的负责均衡策略,再确定具体访问哪一个 Pod。

无 LabelSelector Service

一般来说,应用系统需要将外部数据库作为后端服务进行连接,或另一个集群或 namespace 中的服务作为后端服务。这些情况,可以通过建立一个无 Label Selector 的 Service 来实现:

apiVersion: v1
kind: Service
metadata:
  name: hdls
spec:
  ports:
  - port: 8080   
    targetPort: 8000 

该 Service 没有标签选择器,即无法选择后端 Pod。这时系统不会自动创建 Endpoint,需要手动创建一个与该 Service 同名的 Endpoint,用于指向实际的后端访问地址。

apiVersion: v1
kind: Endpoints
metadata:
  name: hdls  # 与 Service 同名
subsets:
  - addresss:
   - IP: 1.2.3.4   # 用户指定的 IP 
   ports:
   - port: 8000

此时,如上面的 YAML 创建出来 Endpoint,访问无 Label Selector 的 Service ,即可将请求路由到用户指定的 Endpoint 上。

多端口的 Service

在 service.spec.ports 中定义多个 port 即可,包括指定 port 的名字和协议。

apiVersion: v1
kind: Service
metadata:
  name: hdls
spec:
  ports:
  - name: dns
    port: 8080   
    protocol: TCP
  - name: dns-tcp
    port: 8080 
    protocol: UDP
  selector: 
    app: hdls

外网访问

Pod 和 Service 都是 k8s 集群内部的虚拟概念,所以集群外的客户无法访问。但在某些特殊条件下,我们需要外网可以访问 Pod 或 Service,这时我们需要将 Pod 或 Service 的端口号映射到宿主机,这样客户就可以通过物理机访问容器应用。

外网访问 Pod

将容器应用的端口号映射到物理机上。有两种方式,如下。

设置容器级别的 hostPort

这种是将容器应用的端口号映射到物理机。设置如下:

apiVersion: v1
kind: Pod
metadata:
  name: hdls-pod
spec:
  containers:
  - name: hdls-container
   image: ***
   ports:
   - containerPort: 8000
     hostPort: 8000

设置 Pod 级别的 hostNetwork=true

这种是将该 Pod 中所有容器端口号都直接映射到物理机上。此时需要注意的是,在容器的 ports 定义部分,若不指定 hostPort,默认 hostPort=containerPort,若设置了 hostPort,则 hostPort 必须等于 containerPort。设置如下:

apiVersion: v1
kind: Pod
metadata:
  name: hdls-pod
spec:
  hostNetwork: true
  containers:
  - name: hdls-container
   image: ***
   ports:
   - containerPort: 8000

外网访问 Service

也有两种方式。

设置 nodePort 映射到物理机

首先需要设置 nodePort 映射到物理机,同时需要设置 Service 的类型为 NodePort:

apiVersion: v1
kind: Service
metadata:
  name: hdls
spec:
  type: NodePort  # 指定类型为 NodePort
  ports:
  - port: 8080 
    targetPort: 8000 
    nodePort: 8000   # 指定 nodePort
  selector: 
    app: hdls

设置 LoadBalancer 映射到云服务商提供的 LoadBalancer 地址

这种用法仅用于在公有云服务提供商的云平台上设置 Service 的场景。需要将 service.status.loadBalancer.ingress.ip 设置为云服务商提供的负载均衡器的 IP。则对该 Service 的访问请求将会通过 LoadBalancer 转发到后端 Pod,且负载均衡的实现方式依赖于云服务商提供的 LoadBalancer 的实现机制。

DNS 搭建

为了能够实现通过服务名在集群内部进行服务的相互访问,需要创建一个虚拟的 DNS 服务来完成服务名到 ClusterIP 的解析。

k8s 提供的 DNS

k8s 提供的 DNS 服务名为 skydns,由下面四个组件组成:

  • etcd: DNS 存储;
  • kube2sky: 将 k8s Master 中的 Service 注册到 etcd ;
  • skyDNS: DNS 域名解析服务;
  • healthz: 对 skyDNS 的健康检查。

skyDNS 服务由一个 RC 和一个 Service 组成。在 RC 的配置文件中,需要定义 etcd / kube2sky / skydns / healthz 四个容器,以保证 DNS 服务正常工作。需要注意的是:

  1. kube2sky 容器需要访问 k8s Master,所以需要在配置文件中为其配置 Master 所在物理主机的 IP 地址和端口;
  2. 需要将 kube2sky 和 skydns 容器的启动参数 --domain 设置为 k8s 集群中 Service 所属域名。容器启动后 kube2sky 会通过 API Server 监控集群中所有 service 的定义,生成相应的记录并保存到 etcd ;
  3. skydns 的启动参数 -addr=<IP:Port> 表示本机 TCP 和 UDP 的 Port 端口提供服务。

在 DNS Service 的配置文件中,skyDNS 的 ClusterIP 需要我们指定,每个 Node 的 kubelet 都会使用这个 IP 地址,不会通过系统自动分配;另外,这个 IP 需要在 kube-apiserver 启动参数 --service-cluster-ip-range 内。

在 skydns 容器创建之前,需要先修改每个 Node 上 kubelet 的启动参数:

  • --cluster_dns= ,dns_cluster_ip 为 DNS 服务的 ClusterIP ;
  • --cluster_domain= , dns_domain 为 DNS 服务中设置的域名。

DNS 工作原理

  1. 首先 kube2sky 容器应用通过调用 k8s Master 的 API 获得集群中所有 Service 信息,并持续监控新 Service 的生成,写入 etcd;
  2. 根据 kubelet 的启动参数的设置,kubelet 会在每个新创建的 Pod 中设置 DNS 域名解析配置文件 /etc/resolv.conf 中增加一条 nameserver 配置和 search 配置,通过 nameserver 访问的实际上就是 skydns 在对应端口上提供的 DNS 解析服务;
  3. 最后,应用程序就可以像访问网站域名一样,仅通过服务的名字就能访问服务了。

Ingress

Service 工作在 TCP/IP 层,而 Ingress 将不同的 URL 访问请求转发到后端不同的 Service ,实现 HTTP 层的业务路由机制。而在 k8s 中,需要结合 Ingress 和 Ingress Controller ,才能形成完整的 HTTP 负载均衡。

Ingress Controller

Ingress Controller 用来实现为所有后端 Service 提供一个统一的入口,需要实现基于不同 HTTP URL 向后转发的负载分发规则。Ingress Controller 以 Pod 的形式运行,需要实现的逻辑:

  • 监听 APIServer,获取所有 Ingress 定义;
  • 基于 Ingress 的定义,生成 Nginx 所需的配置文件 /etc/nginx/nginx.conf
  • 执行 nginx -s reload ,重新加载 nginx.conf 配置文件的内容。

定义 Ingress

k8s 中有一种单独的名为 Ingress 的资源,在其配置文件中可以设置到后端 Service 的转发规则。比如,为 hdls.me 定义一个 ingress.yml:

apiVersion: extensions/v1beta1
kind: Ingress
metadata: 
  name: hdls-ingress
spec:
  rules:
  - host: hdls.me
   http:
     paths:
     - path: /web
       backend:
         serviceName: hdls
         servicePort: 8000

最后采用 kubectl create -f ingress.yml 创建 Ingress。可以登录 nginx-ingress Pod 查看其自动生成的 nginx.conf 配置文件内容。

2018/9/23 posted in  Kubernetes