python中的协程模块

协程简介

协程通过async/await语法进行声明,是编写asyncio应用的推荐方式。如下代码,打印’hello’,等待1秒,再打印’world’:

1import asyncio 2async def main(): 3 print('hello') 4 await asyncio.sleep(1) 5 print('world') 6 7asyncio.run(main()) 8 9# 输出: 10# hello 11# world 12 13

注意:简单地调用一个协程并不会使其被调用执行

1>>> main() 2<coroutine object main at 0x1053bb7c8> 3 4

要真正运行一个协程,asyncio提供了三种主要机制:
asyncio.run()函数用来运行最高层级的入口点"main()“函数
等待一个协程。以下代码段会在等待1秒后打印"hello”,然后再次等待2秒后打印"world":

1import asyncio 2import time 3 4async def say_after(delay, what): 5 await asyncio.sleep(delay) 6 print(what) 7 8async def main(): 9 print(f"started at {time.strftime('%X')}") 10 await say_after(1, 'hello') 11 await say_after(2, 'world') 12 print(f"finished at {time.strftime('%X')}") 13 14asyncio.run(main()) 15 16# 输出: 17started at 14:05:23 18hello 19world 20finished at 14:05:26 21 22

asyncio.create_task()函数用来并发运行作为asyncio任务的多个协程。
修改以上示例,并发运行两个say_after协程:

1import asyncio 2import time 3 4async def say_after(delay, what): 5 await asyncio.sleep(delay) 6 print(what) 7 8async def main(): 9 task1 = asyncio.create_task(say_after(1, 'hello')) 10 task2 = asyncio.create_task(say_after(2, 'world')) 11 print(f"started at {time.strftime('%X')}") 12 13 await task1 14 await task2 15 print(f'finished at {time.strftime("%X")}') 16 17asyncio.run(main()) 18# 输出: 19started at 14:13:43 20hello 21world 22finished at 14:13:45 23 24 25

注意:预期的输出显示代码段的运行时间比之前快了1秒

可等待对象

如果一个对象可以在await语句中使用,那么它就是可等待对象。许多asyncio API都被设计为接受可等待对象。可等待对象有三种主要类型:协程,任务和Future。

协程

python协程属于可等待对象,因此可以在其他协程中被等待:

1import asyncio 2 3async def nested(): 4 return 42 5 6async def main(): 7 print(await nested()) 8 9asyncio.run(main()) 10 11

重要:“协程”可用来表示两个紧密关联的概念:
协程函数:定义形式为async def的函数
协程对象:调用协程函数所返回的对象

asyncio也支持旧式的基于生成器的协程。

任务
任务被用来“并行的”调度协程
当一个协程通过asyncio.create_task()函数被封装为一个任务,该协程会被自动调度执行:

1import asyncio 2 3async def nested(): 4 return 42 5 6async def main(): 7 # Schedule nested() to run soon concurrently 8 # with "main()". 9 task = asyncio.create_task(nested()) 10 11 # "task" can now be used to cancel "nested()", or 12 # can simply be awaited to wait until it is complete: 13 await task 14 15asyncio.run(main()) 16 17

Future对象
Future是一种特殊的底层级可等待对象,表示一个异步操作的最终结果。
当一个Future对象被等待,这意味着协程将保持等待直到该Future对象在其他地方操作完毕。
在asyncio中需要Future对象以便允许通过async/await使用基于回调的代码。
通常情况下没有必要在应用层级的代码中创建Future对象。
Future对象有时会由库和某些asyncio API暴露给用户,用作可等待对象:

1async def main(): 2 await function_that_returns_a_future_object() 3 4 # this is also valid: 5 await asyncio.gather( 6 function_that_returns_a_future_object(), 7 some_python_coroutine() 8 ) 9 10

一个很好的返回对象的低层级函数的示例是loop.run_in_executor()。

运行asyncio程序

asyncio.run(coro, *, debug=False)
执行coroutine coro并返回结果。
此函数会运行传入的协程,负责管理asyncio事件循环,终结异步生成器,并关闭线程池。
当有其他asyncio事件循环在同一线程中运行时,此函数不能被调用。
如果debug为True,事件循环将以调试模式运行。
此函数总是会创建一个新的事件循环并在结束时关闭。它应当被用作asyncio程序的主入口点,理想情况下应当只被调用一次。

1async def main(): 2 await asyncio.sleep(1) 3 print('hello') 4 5asyncio.run(main()) 6 7

注解:asyncio.run()的源代码可以在Lib/asyncio/runners.py中找到。

创建任务

asyncio.create_task(coro, *, name=None)
将coro协程封装为一个Task并调度其执行。返回Task对象。
name不为None,它将使用Task.set_name()来设为任务的名称。
该任务会在get_running_loop()返回的循环中执行,如果当前线程没有在运行的循环则会引发RuntimeError。
此函数 在 Python 3.7 中被加入。在 Python 3.7 之前,可以改用低层级的 asyncio.ensure_future() 函数。

1async def coro(): 2 ... 3 4# In Python 3.7+ 5task = asyncio.create_task(coro()) 6... 7 8# This works in all Python versions but is less readable 9task = asyncio.ensure_future(coro()) 10 11

3.7 新版功能.

在 3.8 版更改: 添加了 name 形参。

休眠

coroutine asyncio.sleep(delay, result=None, *, loop=None)
阻塞 delay 指定的秒数。

如果指定了 result,则当协程完成时将其返回给调用者。

sleep() 总是会挂起当前任务,以允许其他任务运行。

将 delay 设为 0 将提供一个经优化的路径以允许其他任务运行。 这可供长期间运行的函数使用以避免在函数调用的全过程中阻塞事件循环。

Deprecated since version 3.8, will be removed in version 3.10: loop 形参。

以下协程示例运行 5 秒,每秒显示一次当前日期:

1import asyncio 2import datetime 3 4async def display_date(): 5 loop = asyncio.get_running_loop() 6 end_time = loop.time() + 5.0 7 while True: 8 print(datetime.datetime.now()) 9 if (loop.time() + 1.0) >= end_time: 10 break 11 await asyncio.sleep(1) 12 13asyncio.run(display_date()) 14# 输出: 152021-10-21 15:00:45.559416 162021-10-21 15:00:46.572488 172021-10-21 15:00:47.580234 182021-10-21 15:00:48.592124 192021-10-21 15:00:49.605523 20 21

并发运行任务

awaitable asyncio.gather(*aws, loop=None, return_exceptions=False)
并发 运行 aws 序列中的 可等待对象。

如果 aws 中的某个可等待对象为协程,它将自动被作为一个任务调度。

如果所有可等待对象都成功完成,结果将是一个由所有返回值聚合而成的列表。结果值的顺序与 aws 中可等待对象的顺序一致。

如果 return_exceptions 为 False (默认),所引发的首个异常会立即传播给等待 gather() 的任务。aws 序列中的其他可等待对象 不会被取消 并将继续运行。

如果 return_exceptions 为 True,异常会和成功的结果一样处理,并聚合至结果列表。

如果 gather() 被取消,所有被提交 (尚未完成) 的可等待对象也会 被取消。

如果 aws 序列中的任一 Task 或 Future 对象 被取消,它将被当作引发了 CancelledError 一样处理 – 在此情况下 gather() 调用 不会 被取消。这是为了防止一个已提交的 Task/Future 被取消导致其他 Tasks/Future 也被取消。

Deprecated since version 3.8, will be removed in version 3.10: loop 形参。

1import asyncio 2 3async def factorial(name, number): 4 f = 1 5 for i in range(2, number + 1): 6 print(f"Task {name}: Compute factorial({number}), currently i={i}...") 7 await asyncio.sleep(1) 8 f *= i 9 print(f"Task {name}: factorial({number}) = {f}") 10 return f 11 12async def main(): 13 # Schedule three calls *concurrently*: 14 L = await asyncio.gather( 15 factorial("A", 2), 16 factorial("B", 3), 17 factorial("C", 4), 18 ) 19 print(L) 20 21asyncio.run(main()) 22 23# Expected output: 24# 25# Task A: Compute factorial(2), currently i=2... 26# Task B: Compute factorial(3), currently i=2... 27# Task C: Compute factorial(4), currently i=2... 28# Task A: factorial(2) = 2 29# Task B: Compute factorial(3), currently i=3... 30# Task C: Compute factorial(4), currently i=3... 31# Task B: factorial(3) = 6 32# Task C: Compute factorial(4), currently i=4... 33# Task C: factorial(4) = 24 34# [2, 6, 24] 35 36

注解 如果 return_exceptions 为 False,则在 gather() 被标记为已完成后取消它将不会取消任何已提交的可等待对象。 例如,在将一个异常传播给调用者之后,gather 可被标记为已完成,因此,在从 gather 捕获一个(由可等待对象所引发的)异常之后调用 gather.cancel() 将不会取消任何其他可等待对象。

代码交流 2021