Python 协程

2018/09/14 21:08 下午 posted in  Python

协程(Coroutine)又称微线程,即轻量级的线程。协程可以理解成与调用方协作,产出由调用方提供的值的过程。与线程相比,其优势在于上下文切换的成本更低,且由用户自己控制。

发展史

Python 中的协程主要经历了三个阶段。协程最开始是在 Python 2.5 中实现的,由生成器变形而来,以关键词 yield/send 等实现;引入 yield from,可以把复杂的生成器重构成小型的嵌套生成器;Python 3.5 中引入了 async/await 语法糖。

由于 yield from 已被移除 python 的语法,本文重点分析 yield/send 和 async/await 关键字是怎么实现协程的。

yield / send

协程的运行

在生成器中使用 yield 关键字,而后生成器的调用方使用 .send(value) 方法发送数据,该数据 value 就会成为生成器函数中 yield 表达式的值。换句话说,yield 是生成器中的一个暂停器,第一次调用时在 yield 处暂停,将 yield 右边的值 return 出去;下一次 send 进来的数据成为 yield 表达式的值。举个例子:

def count_num():
    r = 0
    print("Started.")
    while True:
        x = yield r
        print("Received x: {}".format(x))
        r = r + 1


if __name__ == "__main__":
    coroutine = count_num()
    next(coroutine)

    for i in "hello":
        t = coroutine.send(i)
        print("Coroutine times: {}".format(t))
    coroutine.close()

运行结果如下:

Started.
Received x: h
Coroutine times: 1
Received x: e
Coroutine times: 2
Received x: l
Coroutine times: 3
Received x: l
Coroutine times: 4
Received x: o
Coroutine times: 5

由此可以看出,局部变量 r 的值没有随协程的暂停而改变,可知协程中的局部变量保持在一个上下文中。这也是使用协程的一个好处,无需使用类对象的属性或闭包在多次调用中保持在上下文中。

另外,这里还需要注意的是,next(coroutine) 这一处的意思是先调用协程使其先运行到 yield 处进行第一次暂停,使协程处于暂停状态。之后协程再 send 时,才能生效,这一举动称为“预激”。

协程的状态总共有 4 种,分别为:
GEN_CREATED :等待开始执行状态
GEN_RUNNING :解释器正在执行
GEN_SUSPENDED :在 yield 表达式处暂停
GEN_CLOSED :执行结束

协程预激除了 next() 方法,还可以使用 .send(None) 方法,效果一样。如果将上述例子中预激的代码注释掉,运行后会报错:

Traceback (most recent call last):
  File "test.py", line 15, in <module>
    t = coroutine.send(i)
TypeError: can't send non-None value to a just-started generator

错误栈中表达的很清楚:在生成器还处于开始状态时,不可 send 不为 None 的值。

协程异常处理

协程中若出现未处理的异常,会向上传至 next 或 send 的调用方,且此时协程停止。而我们大多时候需要协程内部在出现异常时不退出,这时候通常的处理方法是 throw 方法。

throw 可以使协程抛出指定的异常,而不影响其运行的流程,协程依然在 yield 处暂停。在上述的例子中加入异常处理的功能:

class Error(Exception):
    pass


def count_num():
    r = 0
    print("Started.")
    while True:
        try:
            x = yield r
            print("Received x: {}".format(x))
        except Error:
            print("Coroutine error.")
        r = r + 1


if __name__ == "__main__":
    coroutine = count_num()
    next(coroutine)

    n = 0
    for i in "hello":
        n = n + 1
        if n % 2 == 0:
            coroutine.throw(Error)
        else:
            t = coroutine.send(i)
            print("Coroutine times: {}".format(t))
    coroutine.close()

运行结果:

Started.
Received x: h
Coroutine times: 1
Coroutine error.
Received x: l
Coroutine times: 3
Coroutine error.
Received x: o
Coroutine times: 5

协程处理异常除了用 throw 方法,还可以用 send 方法传入一个非法的值,比如常用的 None,这个也称为哨值。将上述代码中的 coroutine.throw(Error) 换成 coroutine.send(None) 也是一样的效果。

上述代码中最后调用了 close 方法,将协程的状态切换成 GEN_CLOSED。该方法的原理是在 yield 暂停处抛出 GeneratorExit 异常,若协程调用方没有处理这个异常或抛出了 StopIteration 异常,则不做处理,且将其状态切换成 GEN_CLOSED

async / await

从 python3.5 开始,Python 新加了一种协程定义方法 async def。简单的讲,async 定义一个协程,await 用于挂起阻塞的异步调用接口;而协程的调用方法在 Python3.7 中做了些许改动,所以这一节以 Python 版本分成两部分来讲解。

python 3.5 - 3.6

阅读协程的官方文档就会知道:协程本身无法运行,只有将其置于事件循环(event_loop)中才能运行其代码。那么事件循环是什么?在源码中的定义为:

# A TLS for the running event loop, used by _get_running_loop.
class _RunningLoop(threading.local):
    loop_pid = (None, None)


_running_loop = _RunningLoop()

event_loop 继承了 threading.local,创建一个全局 ThreadLocal 对象。后续将协程推进这个 loop,只有 loop 是 running 的,协程才得以执行。

协程的执行

执行协程首先需要将协程包成 future 或 task 再推进 event_loop;然后执行 loop.run_until_complete ,运行 loop 中所有协程。

这里 future 指一种对象,表示异步执行的操作;task 指对协程进一步封装,其中包含任务的各种状态,其中 task 是 future 的子类。

有两种方式:asyncio.ensure_futureloop.create_task 。但二者本质都是一样:将协程包成 future。下面两种实现方式效果一样:

async def count_num(num):
    print("count num: {}".format(num))


if __name__ == "__main__":
    loop = asyncio.get_event_loop()

    future = asyncio.ensure_future(count_num(100))  
    loop.run_until_complete(future)
    loop.close()
async def count_num(num):
    print("count num: {}".format(num))


if __name__ == "__main__":
    loop = asyncio.get_event_loop()

    task = loop.create_task(count_num(100))
    loop.run_until_complete(task)
    loop.close()

需要注意的是:也可以直接执行 loop.run_until_complete(coroutine),但这里是其实是先将 coroutine 包进了 ensure_future 里的。

协程的并发与阻塞

既然协程是为了异步而生,那么其异步执行肯定是重点。asyncio 的调用方法 asyncio.gather() 就可以将多个协程推进同一个事件循环。看个例子:

import asyncio
import time


async def count_num(num):
    print("Started coroutine #{} at".format(num), time.strftime('%X'))
    for i in range(num):
        time.sleep(1)
        print("[coroutine #{}] count: {} at".format(num, i), time.strftime('%X'), "...")
    print("Finish coroutine #{} at".format(num), time.strftime('%X'))


if __name__ == "__main__":
    print("Start.")
    loop = asyncio.get_event_loop()

    loop.run_until_complete(asyncio.gather(
        count_num(3),
        count_num(4),
    ))
    loop.close()

    print("Finish at", time.strftime('%X'))

该例子中调用了两个协程,功能是数数,从 0 开始数到传进去的数结束,每数一个数都会 sleep 1s。为了直观,每一步都将当前的时间打印出来。执行结果:

Start.
Started coroutine #3 at 11:51:10
[coroutine #3] count: 0 at 11:51:11...
[coroutine #3] count: 1 at 11:51:12...
[coroutine #3] count: 2 at 11:51:13...
Finish coroutine #3 at 11:51:13
Started coroutine #4 at 11:51:13
[coroutine #4] count: 0 at 11:51:14...
[coroutine #4] count: 1 at 11:51:15...
[coroutine #4] count: 2 at 11:51:16...
[coroutine #4] count: 3 at 11:51:17...
Finish coroutine #4 at 11:51:17
Finish at 11:51:17

通过执行结果可以看出,#3 和 #4 是分别执行,没有我们想要的并发的效果。而这里就需要 await 关键词来发挥作用了,await 可以将阻塞的协程挂起,让事件循环执行别的协程,直到其他协程挂起或执行完毕。我们将上例中的 sleep 进行修改:

import asyncio
import time


async def count_num(num):
    print("Started coroutine #{} at".format(num), time.strftime('%X'))
    for i in range(num):
        await asyncio.sleep(1)
        print("[coroutine #{}] count: {} at".format(num, i), time.strftime('%X'), "...")
    print("Finish coroutine #{} at".format(num), time.strftime('%X'))


if __name__ == "__main__":
    print("Start.")
    loop = asyncio.get_event_loop()

    loop.run_until_complete(asyncio.gather(
        count_num(3),
        count_num(4),
    ))
    loop.close()

    print("Finish at", time.strftime('%X'))

执行结果:

Start.
Started coroutine #3 at 11:59:16
Started coroutine #4 at 11:59:16
[coroutine #3] count: 0 at 11:59:17...
[coroutine #4] count: 0 at 11:59:17...
[coroutine #3] count: 1 at 11:59:18...
[coroutine #4] count: 1 at 11:59:18...
[coroutine #3] count: 2 at 11:59:19...
Finish coroutine #3 at 11:59:19
[coroutine #4] count: 2 at 11:59:19...
[coroutine #4] count: 3 at 11:59:20...
Finish coroutine #4 at 11:59:20
Finish at 11:59:20

python3.7

Python3.7 在原来的基础上对协程的执行做了一层封装,使得这个功能更加亲人。我们只需要定义我们需要的协程,然后调用 .run() 即可;在多协程的情况下,我们做出一个统一入口即可,看个例子:

import asyncio
import time


async def count_num(num):
    print("Started coroutine #{} at ".format(num), time.strftime('%X'))
    for i in range(num):
        await asyncio.sleep(1)
        print("[coroutine #{}] count: {} at ".format(num, i),
              time.strftime('%X'), "...")
    print("Finish coroutine #{} at ".format(num), time.strftime('%X'))


async def main():
    await asyncio.gather(
        count_num(3),
        count_num(4),
    )


if __name__ == "__main__":
    print("Start.")
    asyncio.run(main())
    print("Finish at ", time.strftime('%X'))

这段代码的结果与上面例子的结果一毛一样,可以看到调用起来就简单很多,这是因为绝大部分的逻辑(包括事件循环)都在 .run() 方法中替你封装好了。拜读一下源码:

def run(main, *, debug=False):
    if events._get_running_loop() is not None:
        raise RuntimeError(
            "asyncio.run() cannot be called from a running event loop")

    if not coroutines.iscoroutine(main):
        raise ValueError("a coroutine was expected, got {!r}".format(main))

    loop = events.new_event_loop()
    try:
        events.set_event_loop(loop)
        loop.set_debug(debug)
        return loop.run_until_complete(main)
    finally:
        try:
            _cancel_all_tasks(loop)
            loop.run_until_complete(loop.shutdown_asyncgens())
        finally:
            events.set_event_loop(None)
            loop.close()

然后这里有一点需要注意的是,.run() 函数不能在同一个线程已有事件循环的情况下调用,它始终会新建一个事件循环,并且在执行完所有的协程后将其关闭。