协程(Coroutine)又称微线程,即轻量级的线程。协程可以理解成与调用方协作,产出由调用方提供的值的过程。与线程相比,其优势在于上下文切换的成本更低,且由用户自己控制。
发展史
Python 中的协程主要经历了三个阶段。协程最开始是在 Python 2.5 中实现的,由生成器变形而来,以关键词 yield/send 等实现;引入 yield from,可以把复杂的生成器重构成小型的嵌套生成器;Python 3.5 中引入了 async/await 语法糖。
由于 yield from 已被移除 python 的语法,本文重点分析 yield/send 和 async/await 关键字是怎么实现协程的。
yield / send
协程的运行
在生成器中使用 yield 关键字,而后生成器的调用方使用 .send(value)
方法发送数据,该数据 value 就会成为生成器函数中 yield 表达式的值。换句话说,yield 是生成器中的一个暂停器,第一次调用时在 yield 处暂停,将 yield 右边的值 return 出去;下一次 send 进来的数据成为 yield 表达式的值。举个例子:
def count_num():
r = 0
print("Started.")
while True:
x = yield r
print("Received x: {}".format(x))
r = r + 1
if __name__ == "__main__":
coroutine = count_num()
next(coroutine)
for i in "hello":
t = coroutine.send(i)
print("Coroutine times: {}".format(t))
coroutine.close()
运行结果如下:
Started.
Received x: h
Coroutine times: 1
Received x: e
Coroutine times: 2
Received x: l
Coroutine times: 3
Received x: l
Coroutine times: 4
Received x: o
Coroutine times: 5
由此可以看出,局部变量 r
的值没有随协程的暂停而改变,可知协程中的局部变量保持在一个上下文中。这也是使用协程的一个好处,无需使用类对象的属性或闭包在多次调用中保持在上下文中。
另外,这里还需要注意的是,next(coroutine)
这一处的意思是先调用协程使其先运行到 yield 处进行第一次暂停,使协程处于暂停状态。之后协程再 send 时,才能生效,这一举动称为“预激”。
协程的状态总共有 4 种,分别为:
GEN_CREATED
:等待开始执行状态
GEN_RUNNING
:解释器正在执行
GEN_SUSPENDED
:在 yield 表达式处暂停
GEN_CLOSED
:执行结束
协程预激除了 next()
方法,还可以使用 .send(None)
方法,效果一样。如果将上述例子中预激的代码注释掉,运行后会报错:
Traceback (most recent call last):
File "test.py", line 15, in <module>
t = coroutine.send(i)
TypeError: can't send non-None value to a just-started generator
错误栈中表达的很清楚:在生成器还处于开始状态时,不可 send 不为 None 的值。
协程异常处理
协程中若出现未处理的异常,会向上传至 next 或 send 的调用方,且此时协程停止。而我们大多时候需要协程内部在出现异常时不退出,这时候通常的处理方法是 throw
方法。
throw
可以使协程抛出指定的异常,而不影响其运行的流程,协程依然在 yield 处暂停。在上述的例子中加入异常处理的功能:
class Error(Exception):
pass
def count_num():
r = 0
print("Started.")
while True:
try:
x = yield r
print("Received x: {}".format(x))
except Error:
print("Coroutine error.")
r = r + 1
if __name__ == "__main__":
coroutine = count_num()
next(coroutine)
n = 0
for i in "hello":
n = n + 1
if n % 2 == 0:
coroutine.throw(Error)
else:
t = coroutine.send(i)
print("Coroutine times: {}".format(t))
coroutine.close()
运行结果:
Started.
Received x: h
Coroutine times: 1
Coroutine error.
Received x: l
Coroutine times: 3
Coroutine error.
Received x: o
Coroutine times: 5
协程处理异常除了用 throw
方法,还可以用 send
方法传入一个非法的值,比如常用的 None
,这个也称为哨值。将上述代码中的 coroutine.throw(Error)
换成 coroutine.send(None)
也是一样的效果。
上述代码中最后调用了 close
方法,将协程的状态切换成 GEN_CLOSED
。该方法的原理是在 yield 暂停处抛出 GeneratorExit
异常,若协程调用方没有处理这个异常或抛出了 StopIteration
异常,则不做处理,且将其状态切换成 GEN_CLOSED
。
async / await
从 python3.5 开始,Python 新加了一种协程定义方法 async def
。简单的讲,async 定义一个协程,await 用于挂起阻塞的异步调用接口;而协程的调用方法在 Python3.7 中做了些许改动,所以这一节以 Python 版本分成两部分来讲解。
python 3.5 - 3.6
阅读协程的官方文档就会知道:协程本身无法运行,只有将其置于事件循环(event_loop)中才能运行其代码。那么事件循环是什么?在源码中的定义为:
# A TLS for the running event loop, used by _get_running_loop.
class _RunningLoop(threading.local):
loop_pid = (None, None)
_running_loop = _RunningLoop()
event_loop 继承了 threading.local
,创建一个全局 ThreadLocal
对象。后续将协程推进这个 loop,只有 loop 是 running 的,协程才得以执行。
协程的执行
执行协程首先需要将协程包成 future 或 task 再推进 event_loop;然后执行 loop.run_until_complete
,运行 loop 中所有协程。
这里 future 指一种对象,表示异步执行的操作;task 指对协程进一步封装,其中包含任务的各种状态,其中 task 是 future 的子类。
有两种方式:asyncio.ensure_future
和 loop.create_task
。但二者本质都是一样:将协程包成 future。下面两种实现方式效果一样:
async def count_num(num):
print("count num: {}".format(num))
if __name__ == "__main__":
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(count_num(100))
loop.run_until_complete(future)
loop.close()
async def count_num(num):
print("count num: {}".format(num))
if __name__ == "__main__":
loop = asyncio.get_event_loop()
task = loop.create_task(count_num(100))
loop.run_until_complete(task)
loop.close()
需要注意的是:也可以直接执行 loop.run_until_complete(coroutine)
,但这里是其实是先将 coroutine
包进了 ensure_future
里的。
协程的并发与阻塞
既然协程是为了异步而生,那么其异步执行肯定是重点。asyncio
的调用方法 asyncio.gather()
就可以将多个协程推进同一个事件循环。看个例子:
import asyncio
import time
async def count_num(num):
print("Started coroutine #{} at".format(num), time.strftime('%X'))
for i in range(num):
time.sleep(1)
print("[coroutine #{}] count: {} at".format(num, i), time.strftime('%X'), "...")
print("Finish coroutine #{} at".format(num), time.strftime('%X'))
if __name__ == "__main__":
print("Start.")
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(
count_num(3),
count_num(4),
))
loop.close()
print("Finish at", time.strftime('%X'))
该例子中调用了两个协程,功能是数数,从 0 开始数到传进去的数结束,每数一个数都会 sleep 1s。为了直观,每一步都将当前的时间打印出来。执行结果:
Start.
Started coroutine #3 at 11:51:10
[coroutine #3] count: 0 at 11:51:11...
[coroutine #3] count: 1 at 11:51:12...
[coroutine #3] count: 2 at 11:51:13...
Finish coroutine #3 at 11:51:13
Started coroutine #4 at 11:51:13
[coroutine #4] count: 0 at 11:51:14...
[coroutine #4] count: 1 at 11:51:15...
[coroutine #4] count: 2 at 11:51:16...
[coroutine #4] count: 3 at 11:51:17...
Finish coroutine #4 at 11:51:17
Finish at 11:51:17
通过执行结果可以看出,#3 和 #4 是分别执行,没有我们想要的并发的效果。而这里就需要 await
关键词来发挥作用了,await
可以将阻塞的协程挂起,让事件循环执行别的协程,直到其他协程挂起或执行完毕。我们将上例中的 sleep
进行修改:
import asyncio
import time
async def count_num(num):
print("Started coroutine #{} at".format(num), time.strftime('%X'))
for i in range(num):
await asyncio.sleep(1)
print("[coroutine #{}] count: {} at".format(num, i), time.strftime('%X'), "...")
print("Finish coroutine #{} at".format(num), time.strftime('%X'))
if __name__ == "__main__":
print("Start.")
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(
count_num(3),
count_num(4),
))
loop.close()
print("Finish at", time.strftime('%X'))
执行结果:
Start.
Started coroutine #3 at 11:59:16
Started coroutine #4 at 11:59:16
[coroutine #3] count: 0 at 11:59:17...
[coroutine #4] count: 0 at 11:59:17...
[coroutine #3] count: 1 at 11:59:18...
[coroutine #4] count: 1 at 11:59:18...
[coroutine #3] count: 2 at 11:59:19...
Finish coroutine #3 at 11:59:19
[coroutine #4] count: 2 at 11:59:19...
[coroutine #4] count: 3 at 11:59:20...
Finish coroutine #4 at 11:59:20
Finish at 11:59:20
python3.7
Python3.7 在原来的基础上对协程的执行做了一层封装,使得这个功能更加亲人。我们只需要定义我们需要的协程,然后调用 .run()
即可;在多协程的情况下,我们做出一个统一入口即可,看个例子:
import asyncio
import time
async def count_num(num):
print("Started coroutine #{} at ".format(num), time.strftime('%X'))
for i in range(num):
await asyncio.sleep(1)
print("[coroutine #{}] count: {} at ".format(num, i),
time.strftime('%X'), "...")
print("Finish coroutine #{} at ".format(num), time.strftime('%X'))
async def main():
await asyncio.gather(
count_num(3),
count_num(4),
)
if __name__ == "__main__":
print("Start.")
asyncio.run(main())
print("Finish at ", time.strftime('%X'))
这段代码的结果与上面例子的结果一毛一样,可以看到调用起来就简单很多,这是因为绝大部分的逻辑(包括事件循环)都在 .run()
方法中替你封装好了。拜读一下源码:
def run(main, *, debug=False):
if events._get_running_loop() is not None:
raise RuntimeError(
"asyncio.run() cannot be called from a running event loop")
if not coroutines.iscoroutine(main):
raise ValueError("a coroutine was expected, got {!r}".format(main))
loop = events.new_event_loop()
try:
events.set_event_loop(loop)
loop.set_debug(debug)
return loop.run_until_complete(main)
finally:
try:
_cancel_all_tasks(loop)
loop.run_until_complete(loop.shutdown_asyncgens())
finally:
events.set_event_loop(None)
loop.close()
然后这里有一点需要注意的是,.run()
函数不能在同一个线程已有事件循环的情况下调用,它始终会新建一个事件循环,并且在执行完所有的协程后将其关闭。