Python3: 异步 IO - asyncio 协程与任务、流、子进程、队列、同步

本文链接: https://xiets.blog.csdn.net/article/details/115557939

Python3 学习笔记(目录)

Python 异步 I/O 官方文档: Asynchronous I/O

asyncio 是用于编写 单线程内 并发 代码的库,使用 async/await 语法。

asyncio 被用作多个提供高性能 Python 异步框架的基础,包括网络和网站服务,数据库连接库,分布式任务队列等等。asyncio 往往是构建 IO 密集型和高层级 结构化 网络代码的最佳选择。

asyncio 提供了一组 高层级 API 用于:

  1. 并发地 运行 Python 协程 并对其执行过程实现完全控制: Coroutines and Tasks(协程与任务)
  2. 执行 网络 IO 和 IPC: asyncio Streams(流)
  3. 控制 子进程: asyncio Subprocesses(子进程)
  4. 通过 队列 实现分布式任务: asyncio Queue(队列集)
  5. 同步 并发代码: asyncio Synchronization(同步原语)

asyncio 还有一些 低层级 API 以支持 库和框架的开发:事件循环、Futures、传输和协议、策略、平台支持。
低层级 API 一般只用于在库和框架的编写(底层库和框架可能需要更细致地使用 asyncio)。
应用层级的代码中应尽量使用 高层级 API。
指南与教程:高层级 API 索引、低层级 API 索引、用 asyncio 开发

1. 协程与任务: Coroutines and Tasks

协程 通过 async/await 语法进行声明,是编写 asyncio 应用的推荐方式。

async/await 语法:

  • async def:用于编写 协程函数;
  • await:等待 协程/任务 的执行完成。

“协程”可以用来表示两个紧密关联的概念:

  • 协程函数:定义形式为 async def 的函数;
  • 协程对象:调用 协程函数 所返回的对象。

1.1 协程 的简单示例

以下代码段(需要 Python 3.7+)会打印 “Hello”,等待 1 秒,再打印 “World”:

1>>> import asyncio 2>>> 3>>> async def main(): # 定义 协程函数 4... print("Hello") 5... await asyncio.sleep(1) # 阻塞协程, 但不阻塞线程 6... print("World") 7... 8>>> asyncio.run(main()) # 运行协程(Python 3.7+) 9Hello 10World 11 12

运行协程过程:调用 协程函数,返回 协程对象,使用 asyncio.run(...) 运行 协程对象

1>>> m = main() # 调用 协程函数, 返回 协程对象 2>>> asyncio.run(m) # 运行 协程对象(同一个对象只能被运行一次) 3Hello 4World 5 6

注意:简单地调用一个 协程 并不会使其被调度执行

1>>> main() 2<coroutine object main at 0x7f87ef8899c8> 3 4

1.2 运行协程

要真正运行一个协程,asyncio 提供了 3 种主要机制:

  1. asyncio.run() 函数用来运行最高层级的入口点 main() 函数(参考上面示例);
  2. await 等待一个协程(对象);
  3. asyncio.create_task() 函数用来并发运行作为 asyncio 任务 的多个协程。

以上 3 种方式均可使 协程 被调度执行。

1.2.1 主入口函数: asyncio.run()

函数原型: asyncio.run(coroutine, *, debug=False)

此函数是用来运行协程的最高层级的入口点 main() 函数(协程调度的主入口函数),执行 coroutine 并返回结果。

此函数会运行传入的协程(对象),负责管理 asyncio 事件循环,终结异步生成器,并关闭线程池。当有其他 asyncio 事件循环在同一线程中运行时,此函数不能被调用。此函数总是会创建一个新的事件循环并在结束时关闭。

它应当被 用作 asyncio 程序的主入口点,理想情况下应当 只被调用一次

1import asyncio 2 3async def main(): # 定义 协程函数(协程调度的主入口函数) 4 await asyncio.sleep(1) 5 print("Hello World") 6 7asyncio.run(main()) # 在最高层级入口点运行协程 8 9

获取协程的运行返回:

1import asyncio 2 3async def main(): 4 await asyncio.sleep(1) 5 print("Hello World") 6 return "main over" 7 8ret = asyncio.run(main()) # run() 函数执行完的返回结果为 main() 协程函数的返回结果 9print(ret) # 输出: "main over" 10 11

1.2.2 await 可等待对象

如果一个对象可以在 await 语句中使用,那么该对象即为 可等待 对象,可以在其他协程中被等待。await 语句只能在 async 协程函数中使用。

可等待 对象有 3 种主要类型:协程任务Future

await 等待一个 协程(对象) 时,该协程将被运行,运行完成后等待才结束。

1import asyncio 2 3async def hello(): # 定义 协程函数 4 await asyncio.sleep(2) # 等待 2 秒 5 print("Hello") 6 7async def world(): # 定义 协程函数 8 await asyncio.sleep(3) # 等待 3 秒 9 print("World") 10 11async def main(): # 定义 协程函数(主入口函数) 12 print("main start") 13 await hello() # 运行协程 并 等待协程运行完毕 14 await world() # 运行协程 并 等待协程运行完毕 15 print("main end") 16 17asyncio.run(main()) # 运行协程(的主函数) 18 # main() 函数执行完毕后 run() 函数才执行完毕 19print("over") 20 21

输出结果(需要 5 秒):

1main start 2Hello 3World 4main end 5over 6 7

获取 await 等待的返回值:

1import asyncio 2 3async def hello(): 4 await asyncio.sleep(2) 5 return "Hello" 6 7async def world(): 8 await asyncio.sleep(3) 9 return "World" 10 11async def main(): 12 # 等待 协程 执行完毕后, await 语句的结果为 协程函数的返回值 13 ret1 = await hello() 14 ret2 = await world() 15 16 print(ret1) # 输出: "Hello" 17 print(ret2) # 输出: "World" 18 19asyncio.run(main()) 20 21

1.2.3 并发运行协程: asyncio.create_task()

前面演示的协程都是使用 await 等待同步运行,协程的真正的优势是异步并发运行,并发运行协程需要将协程封装为 async 任务。

创建 async 任务的函数原型: asyncio.create_task(coroutine, *, name=None)

此函数将 coroutine 协程(对象) 封装为一个 asyncio.Task 并立即自动调度其执行,同时(不等待协程运行完毕)返回 asyncio.Task 对象。一般需要在 async 协程函数中创建异步任务并等待。

此函数在 Python 3.7 中被加入。

任务还可以调用 Task.cancel(msg=None) 方法中途取消。

并发运行协程(任务)代码示例:

1import asyncio 2 3async def delay_print(time, text): # 定义 协程函数 4 await asyncio.sleep(time) 5 print(text) 6 7async def main(): # 定义 协程函数(主入口函数) 8 print("main start") 9 10 # 将协程封装为任务, 协程 将立即被 异步运行, create_task() 函数立即返回, 不阻塞 11 task1 = asyncio.create_task(delay_print(2, "Hello")) 12 task2 = asyncio.create_task(delay_print(3, "World")) 13 14 print("task create complete") 15 16 await task1 # 等待任务完成(阻塞) 17 await task2 # 等待任务完成(阻塞) 18 19 # await 语句返回结果为任务对应的协程函数 delay_print() 的返回值 20 21 print("main end") 22 23asyncio.run(main()) # 运行协程(的主函数) 24 25print("over") 26 27

输出结果(只需要 3 秒):

1main start 2task create complete 3Hello 4Hello 5main end 6over 7 8

1.3 异步上下文管理器: async with

官网相关链接: asynchronous-context-managers-and-async-with

普通 wtih 语句的上下文管理器的进入和退出的方法是 enterexit。async with 异步上下文管理器的进入和退出方法则为 aenteraexit(方法需使用 async def 声明为协程函数),能在其中暂停/等待协程任务(不阻塞线程)。异步上下文管理器 与 普通上下文管理器 原理相同(只不过进入和退出的方法需要使用 await 等待)。async with 语句需要在 async 协程函数使用。

async with 语句执行原理:

1async with obj as m: 2 # m.method(...) 3 pass 4 5# 1. obj 表示一个对象(或是一个表达式, 结果为一个对象) 6# 7# 2. 调用 obj 对象的 async def __aenter__() 方法并等待完成, 返回值赋值给 as 右边的变量 m, 8# 即: m = await obj.__aenter__() 9# 10# 3. 执行 with 代码块中的代码 11# 12# 4. 执行完 with 代码块中的代码后, 无论是否发生异常, 调用 obj 的 async def __aexit__() 方法并等待完成, 13# 即: await obj.__aexit__(...) 14 15

上面代码相当于:

1obj = ... 2m = await obj.__aenter__() 3try: 4 # m.method(...) 5 pass 6finally: 7 await obj.__aexit__(...) 8 9

支持 异步上下文管理器 的类的示例:

1import asyncio 2 3 4class AsyncContextManager: 5 6 async def __aenter__(self): 7 """ 8 执行 with 语句块时, 将先调用 __aenter__ 方法, 执行预先需要处理的代码, 9 返回值将赋值给 as 右边的变量(通常返回自己) 10 """ 11 print("__aenter__") 12 await asyncio.sleep(2) 13 return self 14 15 def test(self): 16 print("test") 17 18 async def __aexit__(self, exc_type, exc, tb): 19 """ 20 执行完 with 语句块后会调用 __aexit__ 方法, 执行清理工作, 21 22 如果 with 语句块中抛出异常, 则 exc_type, exc, tb 三个参数 23 分别表示抛出异常的 异常类型、异常对象的值、异常对象的 traceback 对象, 24 如果没有抛出异常, 则三个参数的值均为 None 25 """ 26 print("__aexit__", exc_type, exc, tb) 27 await asyncio.sleep(2) 28 29 30async def main(): 31 async with AsyncContextManager() as m: # obj = AsyncContextManager(); m = await obj.__aenter__() 32 m.test() # with 语句块中如果发生异常, 则执行完 __aexit__ 后抛给上层调用者 33 # other operating # await obj.__aexit__(...) 34 35 36asyncio.run(main()) 37 38 39# 执行后结果输出: 40# __aenter__ 41# test 42# __aexit__ None None None 43 44

1.4 异步迭代器: async for

官网相关链接: asynchronous-iterators-and-async-for

异步迭代器可以在其迭代器实现中调用异步代码(等待/暂停异步任务,不阻塞线程)。普通 for 迭代器的创建迭代器和迭代下一个元素的方法是 iternext。async for 异步迭代器的创建迭代器和迭代下一个元素的方法则为 aiteranext(方法需使用 async def 声明为协程函数),能在其中暂停/等待协程任务(不阻塞线程)。异步迭代器 与 普通迭代器 原理相同(只不过调用迭代下一个元素的方法需要使用 await 等待)。async for 语句需要在 async 协程函数使用。

普通 for 迭代器迭代结束抛出的是 StopIteration 异常,async for 异步迭代器迭代迭代抛出的异常为 StopAsyncIteration。

async for 语句执行原理:

1async for i in iterable_obj: 2 BLOCK 3 4

上面代码相当于:

1iterable_obj = ... # 可迭代对象 2iterator = iterable_obj.__aiter__() # 创建迭代器 3while True: 4 try: 5 i = await iterator.__anext__() # 迭代下一个元素, 等待返回 6 BLOCK 7 except StopAsyncIteration: 8 break # 抛出异常, 迭代结束 9 10

支持 异步迭代器 的类的示例:

1import asyncio 2 3 4class AsyncRange: 5 def __init__(self, stop): 6 self.stop = stop 7 8 def __aiter__(self): 9 """ 10 创建迭代器, 返回一个实现了 async def __anext__ 方法的对象, 11 此方法不能使用 async 修饰, 需要直接返回普通对象(非协程对象) 12 """ 13 self._i = 0 14 return self 15 16 async def __anext__(self): 17 """ 18 迭代返回下一个元素, 此方法需要使用 async 修饰 19 """ 20 if self._i >= self.stop: 21 raise StopAsyncIteration() # 抛出 StopAsyncIteration 异常, 表示没有更多元素, 迭代结束 22 23 await asyncio.sleep(1) # 可以在方法中调用异步代码 24 25 n = self._i 26 self._i += 1 27 return n 28 29 30async def main(): 31 async for i in AsyncRange(10): 32 print(i) 33 34 35asyncio.run(main()) 36 37

1.5 休眠: asyncio.sleep()

await除了可以等待 协程任务 外,还可以等待 Future,调用 asyncio.sleep(delay) 函数返回的就是 Future 对象。

函数原型: coroutine asyncio.sleep(delay, result=None, *)

阻塞 delay 指定的秒数,如果指定了 result,则当协程完成时将其返回给调用者。sleep() 总是会挂起当前任务,以允许其他任务运行。

以下协程示例每秒显示一次时间,一共显示 5 次:

1import asyncio 2import time 3 4async def display_date(): 5 for i in range(5): 6 await asyncio.sleep(1) 7 print(time.strftime("%Y-%m-%d %H:%M:%S")) 8 9asyncio.run(display_date()) 10 11

asyncio.Future 对象:

Future 是一种特殊的 低层级 可等待对象,表示一个异步操作的 最终结果。通常情况下 没有必要 在应用层级的代码中创建 Future 对象。

1.6 并发运行任务: asyncio.gather()

并发运行任务函数原型:awaitable asyncio.gather(*aws, return_exceptions=False)

并发运行 aws 序列中的 可等待对象。如果 aws 中的某个可等待对象为协程,则它将被作为一个任务调度(自动运行协程)。

如果所有可等待对象都成功完成,结果将是一个由所有返回值聚合而成的列表,结果值的顺序与 aws 中可等待对象的顺序一致。

  • 如果 return_exceptions=False (默认),所引发的首个异常会立即传给等待 gather() 的任务,aws 序列中的其他可等待对象 不会被取消 并将继续运行。

  • 如果 return_exceptions=True,异常会和成功的结果一样处理,并聚合至结果列表。

  • 如果 gather() 被取消,所有被提交 (尚未完成) 的可等待对象也会 被取消。

1import asyncio 2 3 4async def delay_print(time, text): # 定义 协程函数 5 await asyncio.sleep(time) 6 print(text) 7 return f"PRINT: {text}" 8 9 10async def main(): # 定义 协程函数(主入口函数) 11 # 创建协程对象(不会自动调度运行) 12 task1 = delay_print(1, "ABC") 13 task2 = delay_print(3, "DEF") 14 task3 = delay_print(3, "UVW") 15 16 # 创建任务(会自动立即异步运行) 17 task4 = asyncio.create_task(delay_print(5, "XYZ")) 18 19 # 并发异步运行多个任务 20 # task1, task2, task3 协程将作为任务被调度运行 21 # task4 任务等待运行完毕 22 all_task = asyncio.gather(task1, task2, task3, task4) 23 24 # 等待所有并发任务的运行完毕(一共需要 5 秒) 25 rets = await all_task 26 27 print(rets) # 输出: ['PRINT: ABC', 'PRINT: DEF', 'PRINT: UVW', 'PRINT: XYZ'] 28 29 30asyncio.run(main()) # 运行协程(的主函数) 31 32

1.7 超时: asyncio.wait_for()

等待超时函数原型:coroutine asyncio.wait_for(aw, timeout, *)

等待 aw 可等待对象 完成,等待 timeout 秒数后超时。如果 aw 是一个协程,它将自动被作为任务调度运行。

timeout 可以可以 None、float 或 int,如果 timeout 为 None,则等待直到完成。

如果发生超时,任务将取消并抛出异常 asyncio.TimeoutError。

1import asyncio 2 3async def delay_print(text): 4 await asyncio.sleep(10.0) 5 print(text) 6 7async def main(): 8 try: 9 await asyncio.wait_for(delay_print("Hello"), 3.0) 10 except asyncio.TimeoutError: 11 print("time out") 12 13asyncio.run(main()) 14 15

1.8 在线程中运行: asyncio.to_thread()

函数原型:coroutine asyncio.to_thread(func, /, *args, **kwargs)

在子线程中异步运行 func 函数,其中 *args, **kwargs 将作为参数被传递给 func 函数。返回一个可被等待以获取 func 函数返回结果的协程。

一些 IO 密集型函数/方法 如果放在协程中运行,会阻塞协程任务的事件循环。因此可以将 IO 密集型函数/方法 放到子线程中执行。

1import asyncio 2import time 3import threading 4 5 6def blocking_io(text, t): 7 time.sleep(t) # 阻塞线程, 模拟 IO 密集型任务 8 print(f"text: {text}; Thread: {threading.current_thread()}") 9 return f"RET {text}" 10 11 12async def main(): 13 # 创建在子线程中运行的任务, blocking_io 方法将在子线程被调用, 不阻塞当前线程 14 t1 = asyncio.to_thread(blocking_io, text="T1", t=1) 15 t2 = asyncio.to_thread(blocking_io, text="T2", t=2) 16 17 # 并发执行任务(此时才会启动子线程真正执行任务), 等待所有任务执行完毕返回 18 ret = await asyncio.gather(t1, t2) 19 20 # 打印结果 21 print(f"ret: {ret}; Thread: {threading.current_thread()}") 22 23 24asyncio.run(main()) 25 26

输出:

1text: T1; Thread: <Thread(asyncio_0, started 123145466556416)> 2text: T2; Thread: <Thread(asyncio_1, started 123145483345920)> 3ret: ['RET T1', 'RET T2']; Thread: <_MainThread(MainThread, started 4488781312)> 4 5

asyncio.to_thread() 内部维护了一个线程池功能。当调用 await asyncio.to_thread() 提交一个任务到子线程时,如果没有现成的线程,会先创建一个子线程,然后在子线程中执行任务,任务执行完毕后不会马上结束线程,等再次调用 await asyncio.to_thread() 方法提交任务时会复用现有的子线程。如果在第一个任务还没有完成前,再次提交任务(即并发执行任务),此时没有空闲的线程,则会创建新的子线程并立即执行任务。

代码示例:

1import asyncio 2import time 3import threading 4 5 6def blocking_io(text, t): 7 time.sleep(t) 8 print(f"text: {text}; Thread: {threading.current_thread()}") 9 10 11async def main(): 12 # 创建子线程, 在子线程中执行任务 13 await asyncio.to_thread(blocking_io, text="T1", t=1) 14 # 复用现有的子线程执行任务 15 await asyncio.to_thread(blocking_io, text="T2", t=2) 16 17 print(f"main; Thread: {threading.current_thread()}") 18 19 20asyncio.run(main()) 21 22

输出:

1text: T1; Thread: <Thread(asyncio_0, started 123145511952384)> 2text: T2; Thread: <Thread(asyncio_0, started 123145511952384)> 3main; Thread: <_MainThread(MainThread, started 4329717248)> 4 5

此函数为 Python 3.9 新版功能

更多跨线程调度参考:跨线程调度、并发和多线程

2. 网络 IO: asyncio Streams

asyncio streams 是用于处理网络连接的支持 async/await 的 高层级 原语。asyncio streams 允许异步发送和接收数据(不阻塞线程)并等待其完成。

asyncio streams 的相关类和方法:

  • asyncio.start_server: 启动 Socket 服务
  • asyncio.open_connection: 建立(TCP)网络连接并返回一对 (reader, writer) 对象
  • asyncio.StreamReader: IO 流读取器对象
  • asyncio.StreamWriter: IO 流写入器对象

下面演示基于 asyncio streams 高层级 API 的 TCP 回显 服务端/客户端 开发示例:

使用 高层级 API asyncio.start_server() 函数实现 TCP 回显 服务端

1import asyncio 2 3 4async def client_connected_cb(reader, writer): 5 # reader: asyncio.StreamReader 6 # writer: asyncio.StreamWriter 7 8 # 获取 客户端 的 地址和端口 9 client_addr = writer.get_extra_info("peername") 10 11 # 读取客户端发来的数据 (bytes) 12 data = await reader.read(1024) 13 message = data.decode() 14 15 print(f"Client {client_addr}: {message}") 16 17 # 发送数据到客户端 (bytes) 18 writer.write(f"Your data = {message}".encode()) 19 # 等待直到可以适当地恢复写入流 20 await writer.drain() 21 22 # 关闭客户端连接 23 writer.close() 24 await writer.wait_closed() 25 26 27async def main(): 28 # 启动 TCP 服务, 监听指定端口, 有客户端连接时将回调 client_connected_cb 方法 29 server = await asyncio.start_server(client_connected_cb, "127.0.0.1", 8080) 30 31 # print(type(server)) # <class 'asyncio.base_events.Server'> 32 33 # 获取 server 监听的地址和端口 34 server_addr = server.sockets[0].getsockname() 35 print(f"Serving on {server_addr}") 36 37 async with server: 38 # 循环监听接收客户请求 39 await server.serve_forever() 40 41 # class asyncio.base_events.Server: 42 # async def __aenter__(self): 43 # return self 44 # async def __aexit__(self, *exc): 45 # self.close() 46 # await self.wait_closed() 47 48 49asyncio.run(main()) 50 51

使用 高层级 API asyncio.open_connection() 函数实现 TCP 回显 客户端

1import asyncio 2 3 4async def main(): 5 # 连接 TCP 服务端, 返回一对 IO 流读写对象 6 reader, writer = await asyncio.open_connection("127.0.0.1", 8080) 7 8 # 获取远端地址和端口 9 server_addr = writer.get_extra_info("peername") 10 11 # 发送数据到服务端 (bytes) 12 writer.write(b"Hello World") 13 14 # 读取服务端发送的数据 15 data = await reader.read(1024) 16 print(f"Server {server_addr}: {data.decode()}") 17 18 # 关闭连接 19 writer.close() 20 21 22asyncio.run(main()) 23 24

无论是服务端还是客户端,连接后都只返回了一对 (reader, writer) 对象,要获取其他连接信息(如地址和端口),需要使用 writer 的 get_extra_info(name, default=None) 方法获取。

除了 高层级 API 实现外,还可使用 低层级 API 实现 TCP 和 UDP 服务端/客户端,参考:

  • 低层级 API loop.create_server() 和 loop.create_connection() 实现 TCP 回显 服务端 和 客户端
  • 低层级 API loop.create_datagram_endpoint() 实现 UDP 回显 服务端 和 客户端

3. 子进程: asyncio Subprocesses

使用 高层级 async/await asyncio API 创建和管理子进程的两个 asyncio 方法:

1# 2# 运行 cmd shell 命令, 返回一个 asyncio.subprocess.Process 实例 3# 4coroutine asyncio.create_subprocess_shell(cmd, 5 stdin=None, 6 stdout=None, 7 stderr=None, 8 limit=None, 9 **kwds) 10 11 12# 13# 创建一个子进程, 返回一个 asyncio.subprocess.Process 实例 14# 15coroutine asyncio.create_subprocess_exec(program, 16 *args, 17 stdin=None, 18 stdout=None, 19 stderr=None, 20 limit=None, 21 **kwds) 22 23

asyncio.create_subprocess_shell(…) 和 asyncio.create_subprocess_exec(…) 异步调用后等待返回一个 asyncio.subprocess.Process 实例与子进程交互。

3.1 运行 shell 命令 示例

1import sys 2import asyncio 3 4 5async def run_cmd(cmd): 6 # 创建子进程 shell, 返回 Process 实例 7 proc = await asyncio.create_subprocess_shell( 8 cmd, 9 stdin=asyncio.subprocess.PIPE, 10 stdout=asyncio.subprocess.PIPE, 11 stderr=asyncio.subprocess.PIPE) 12 13 # 发送到子进程 stdin 的数据 (bytes), 没有则为 None 14 stdin_data = None 15 16 # 与进程交互: 17 # 1. 将数据 stdin_data 发送到到子进程的 stdin; 18 # 2. 从 stdout 和 stderr 读取数据, 直至到达 EOF; 19 # 3. 等待进程终结。 20 # 返回一个元素类型为 bytes 的元组 (stdout_data, stderr_data) 21 stdout_data, stderr_data = await proc.communicate(stdin_data) 22 23 # 获取子进程执行完毕后的返回码 24 print(f"[{cmd}] return code {proc.returncode}") 25 26 # 打印子进程输出到 标准输出(stdout) 和 标准错误(stderr) 的数据 27 if stdout_data: 28 print(f"[stdout]\n{stdout_data.decode()}") 29 if stderr_data: 30 print(f"[stderr]\n{stderr_data.decode()}", file=sys.stderr) 31 32 33async def main(): 34 # 异步运行 shell 命令 35 await run_cmd("ls -l") 36 37 # 并发运行多个 shell 命令 38 task1 = run_cmd("python3 -V") 39 task2 = run_cmd("pwd") 40 await asyncio.gather(task1, task2) 41 42 43asyncio.run(main()) 44 45

3.2 创建子进程 示例

先创建一个 Python 脚本文件: demo.py

1import sys 2 3 4def main(): 5 # 获取命令参数 6 argv = ", ".join(sys.argv) 7 8 # 从 stdin 读取一行数据 9 in_data = input() 10 11 # 打印输出数据到 stdout 12 print(f"argv = {argv}") 13 print(f"in_data = {in_data}") 14 15 16if __name__ == "__main__": 17 main() 18 19

创建一个 Python 的子进程运行 Python 脚本:

1import asyncio 2 3 4async def main(): 5 # 创建子进程, 返回 Process 实例 6 proc = await asyncio.create_subprocess_exec( 7 "python3", "demo.py", "Hello", "World", # 命令 和 参数 8 stdin=asyncio.subprocess.PIPE, # 子进程的 标准输入 类型 9 stdout=asyncio.subprocess.PIPE, # 子进程的 标准输出 类型 10 stderr=asyncio.subprocess.PIPE) # 子进程的 标准错误 类型 11 12 # 从 proc 中获取子进程的 stdin, stdout, stderr 13 # 创建子进程时必须传入这 3 个参数才有值, 否则为 None 14 stdin = proc.stdin 15 stdout = proc.stdout 16 stderr = proc.stderr 17 18 # 写出数据(bytes)到子进程的 stdin 19 stdin.write("你好".encode()) 20 # 必须 写入一个'\n' 或 写入EOF, 子进程才能从 stdin 完成一行数据的读取 21 stdin.write_eof() 22 # 等待直到可以适当地恢复写入流 23 await stdin.drain() 24 25 # 从子进程的的 stdout 和 stderr 读取数据(bytes) 26 stdout_data = await stdout.read() 27 stderr_data = await stderr.read() 28 29 # 等待子进程退出, 返回 code 30 returncode = await proc.wait() 31 32 # 打印数据 33 print(f"[returncode]\n{returncode}") 34 print(f"[stdout_data]\n{stdout_data.decode()}") 35 print(f"[stderr_data]\n{stderr_data.decode()}") 36 37 38asyncio.run(main()) 39 40

结果输出:

1[returncode] 20 3[stdout_data] 4argv = demo.py, Hello, World 5in_data = 你好 6 7[stderr_data] 8 9 10

4. 队列: asyncio Queue

asyncio.Queue 队列被设计成与线程版同步队列 queue 模块类似。asyncio.Queue 是基于协程的、专用于 async/await 代码的同步队列。

与 queue 一样,asyncio.Queue 主要有 3 种队列:

  • asyncio.Queue: 先进先出(FIFO)队列
  • asyncio.LifoQueue: 后进先出(FIFO)队列
  • asyncio.PriorityQueue: 具有优先级顺序的队列

LifoQueue 和 PriorityQueue 继承自 Queue。

asyncio.Queue 类中的属性和方法:

1# 创建一个先进先出(FIFO)队列 2# maxsize 表示队列的最大尺寸: 3# 如果 maxsize <= 0, 则不限制队列尺寸, 即可以无限大; 4# 如果 maxsize > 0, 则当队列尺寸达到 maxsize 时, 5# await put() 将被阻塞, 直到队列中某个元素被 get() 取出. 6class asyncio.Queue(maxsize=0, *) 7 8 9# 队列最大尺寸(可存放元素的数量) 10maxsize 11 12# 返回队列中的元素(任务)数量(不包括已被 get 取出但还没 task_done 的任务) 13qsize() 14 15# 队列是否为空(qsize==0), 为空返回 True, 否则返回 False 16empty() 17 18# 队列是否已满, 队列中元素数量(qsize)达到 maxsize 返回 True, 19# 如果 maxsize <= 0, 则永远返回 False 20full() 21 22 23# 从队列中删除并返回一个元素。如果队列为空, 则等待, 直到队列中有 put 新元素。 24coroutine get() 25 26# 不阻塞, 立即从队列中删除并返回一个元素。如果队列为空, 则抛出 QueueEmpty 异常。 27get_nowait() 28 29 30# 添加一个元素进队列。如果队列已满, 则等待, 直到队列中有任务被 get 取出(无需等待 task_done)。 31coroutine put(item) 32 33# 不阻塞, 立即添加一个元素进队列, 如果队列中没有立即可用的空闲插槽, 则抛出 QueueFull 异常。 34put_nowait(item) 35 36 37# 通知队列上一次 get() 取出的任务已完成, 38# 通过 get() 从队列中删除并取出任务后, 任务处于被取出但未完成状态, 39# 完成后必须要调用 task_done() 通知队列 40task_done() 41 42 43# 阻塞直队列中的所有元素都被接收并处理完毕, 即所有任务都被取出(变成空队列)并已 task_done, 44# 在 task_done() 方法中会检查是否还有未完成任务, 没有则通知 await join() 结束等待。 45coroutine join() 46 47

asyncio.LifoQueue 后进先出队列用法与 asyncio.Queue 相同,asyncio.PriorityQueue 优先级队列的元素(任务)通常通过 (priority_number, item) 元祖的形式 put 和 get。

4.1 Queue 先进先出(FIFO)示例

1import asyncio 2import random 3 4 5class Task: 6 def __init__(self, sleep, text): 7 self.__sleep = sleep 8 self.__text = text 9 10 async def do_task(self): 11 await asyncio.sleep(self.__sleep) 12 print(f"do task ({self.__text})") 13 14 15async def production_tasks(queue: asyncio.Queue[Task]): 16 for i in range(5): 17 # 创建任务 18 task = Task(random.uniform(1.0, 2.0), f"T{i}") 19 # 添加任务到队列中 20 await queue.put(task) 21 22 23async def consuming_tasks(queue: asyncio.Queue[Task]): 24 while True: 25 # 从队列中取出任务 26 task = await queue.get() 27 # 处理任务 28 await task.do_task() 29 # 通知队列任务已被处理完成 30 queue.task_done() 31 32 33async def main(): 34 # 创建 先进先出(FIFO)队列 35 queue = asyncio.Queue() 36 37 # 创建异步 生产任务 38 t1 = asyncio.create_task(production_tasks(queue)) 39 # 创建异步 消费任务 40 t2 = asyncio.create_task(consuming_tasks(queue)) 41 42 await t1 43 await t2 44 45 46asyncio.run(main()) 47 48

输出:

1do task (T0) 2do task (T1) 3do task (T2) 4do task (T3) 5do task (T4) 6 7

4.2 LifoQueue 后进先出(LIFO)示例

其他代码与上面 FIFO 示例一致,只需将创建队列改为 asyncio.LifoQueue:

1# 创建 后进先出(LIFO)队列 2queue = asyncio.LifoQueue() 3 4

输出:

1do task (T4) 2do task (T3) 3do task (T2) 4do task (T1) 5do task (T0) 6 7

4.3 PriorityQueue 优先级队列 示例

1import asyncio 2import random 3from typing import Tuple 4 5 6class Task: 7 def __init__(self, sleep, text): 8 self.__sleep = sleep 9 self.__text = text 10 11 async def do_task(self): 12 await asyncio.sleep(self.__sleep) 13 print(f"do task ({self.__text})") 14 15 16async def production_tasks(queue: asyncio.Queue[Tuple[int, Task]]): 17 for i in range(5): 18 # 创建任务 19 task = Task(random.uniform(1.0, 2.0), f"T{i}") 20 # 添加任务到队列中 (priority_number, task), 21 # priority_number 可以是 int 或 float, 22 # priority_number 越小越先被取出, 23 # 在队列中还未被取出的任务的 priority_number 不能相同 24 await queue.put((5 - i, task)) 25 26 27async def consuming_tasks(queue: asyncio.Queue[Tuple[int, Task]]): 28 while True: 29 # 从队列中取出任务 30 task = await queue.get() 31 # 处理任务 32 await task[1].do_task() 33 # 通知队列任务已被处理完成 34 queue.task_done() 35 36 37async def main(): 38 # 创建 优先级 队列 39 queue = asyncio.PriorityQueue() 40 41 # 创建异步 生产任务 42 t1 = asyncio.create_task(production_tasks(queue)) 43 # 创建异步 消费任务 44 t2 = asyncio.create_task(consuming_tasks(queue)) 45 46 await t1 47 await t2 48 49 50asyncio.run(main()) 51 52

输出:

1do task (T4) 2do task (T3) 3do task (T2) 4do task (T1) 5do task (T0) 6 7

5. 同步: asyncio Synchronization

asyncio 既然支持多任务并发执行,就会和多线程一样存在多个并发任务对共享资源访问以及并发代码块之间通讯的问题(即使是在同一个线程内),即 同步原语。asyncio 同步原语被设计为与 threading 线程模块类似。

asyncio 同步原语 的两个关键注意事项:

  • asyncio 原语不是线程安全的,因此不能在 asyncio 代码中使用线程的同步;
  • asyncio 同步原语的方法不支持 timeout 参数, 如果需要超时操作可以使用asyncio.wait_for()。

asyncio 具有以下基本的同步原语:

  • 同步锁: asyncio.Lock
  • 事件对象: asyncio.Event
  • 条件对象: asyncio.Condition
  • 信号量对象: asyncio.Semaphore
  • 绑定的信号量对象: asyncio.BoundedSemaphore

5.1 锁对象: Lock

asyncio.Lock 任务互斥锁,可以保证并发代码块对共享资源的独占访问。

asyncio.Lock 类的方法:

1# 创建一个互斥锁 2class asyncio.Lock(*) 3 4 5# 获取锁 6# 如果锁的状态为 locked, 则等待直至锁的状态变为 unlocked 7# 当有多个协程在 acquire() 中被阻塞等待解锁时, 最终只有一个协程能获取锁并将其设为 locked 8# 同一个协程获取到锁并上锁后, 直到解锁前, 不能再次调用 acquire() 获取锁(极容易造成死锁) 9coroutine acquire() 10 11 12# 释放锁 13# 如果锁为 locked, 则将其设置为 unlocked 14# 如果锁为 unlocked, 则抛出 RuntimeError 异常 15release() 16 17 18# 判断锁的状态, 如果锁为 locked 状态则返回 True 19locked() 20 21

asyncio.Lock 使用示例,下面代码需要 6 秒才能执行完:

1import asyncio 2 3 4async def async_task(lock: asyncio.Lock, text: str): 5 await lock.acquire() # 获取锁 6 try: 7 await asyncio.sleep(3) 8 print(f"do task: {text}") 9 finally: 10 if lock.locked(): 11 lock.release() # 释放锁 12 13 14async def main(): 15 lock = asyncio.Lock() 16 17 t1 = async_task(lock, "Hello") 18 t2 = async_task(lock, "World") 19 20 await asyncio.gather(t1, t2) # 并发运行两个任务 21 22 23asyncio.run(main()) 24 25

asyncio.Lock 支持 async with 语句:

1async with lock: 2 BLOCK 3 # 需要同步的相关代码... 4 5

上面代码相当于:

1await lock.acquire() 2try: 3 BLOCK 4 # 需要同步的相关代码... 5finally: 6 lock.release() 7 8

5.2 事件对象: Event

asyncio.Event 事件对象可以用来通知多个 asyncio 协程事件状态已改变,可用于 协程间的通讯。asyncio.Event 对象内部维护了一个标记变量,可用通过 set() 方法将其设置为 True,并通过 clear() 方法将其设置为 False。协程中通过 wait() 方法阻塞直至该标记被其他协程标记为 True。该标记初始值为 False。

asyncio.Event 类的方法:

1# 创建一个事件对象 2class asyncio.Event(*) 3 4 5# 等待直至事件标记被设置为 True, 6# 如果事件标记已被设置为 True, 则立即返回, 否则将阻塞直至其他协程调用 set() 7coroutine wait() 8 9 10# 设置事件, 将标记设置为 True, 11# 其他所有 wait() 等待此事件对象的协程将被立即唤醒 12set() 13 14 15# 清空事件, 将标记设置为 False, 16clear() 17 18 19# 判断此事件标记是否已被设置, 事件标记为 True 则返回 True 20is_set() 21 22

asyncio.Event 代码示例:

1import asyncio 2 3 4async def async_task_01(event: asyncio.Event): 5 await asyncio.sleep(5) 6 print("async_task_01") 7 event.set() # 将事件标记设置为 True 8 9 10async def async_task_02(event: asyncio.Event): 11 await event.wait() # 等待事件标记被其他协程设置为 True 12 print("async_task_02") 13 14 15async def main(): 16 event = asyncio.Event() # 创建事件对象 17 18 t1 = async_task_01(event) 19 t2 = async_task_02(event) 20 21 await asyncio.gather(t1, t2) # 并发运行两个协程 22 23 24asyncio.run(main()) 25 26

结果输出:

1async_task_01 2async_task_02 3 4

5.3 条件对象: Condition

asyncio.Condition 条件原语可被任务用于等待某个时间的发生,然后获取对共享资源的独占访问。Condition 条件对象需要 Lock 锁的支持,本质上 Condition 合并了 Event 和 Lock 的功能。

asyncio.Condition 类的方法:

1# 创建一个条件对象, 2# lock 参数为 asyncio.Lock 类型, 3# 如果 lock=None 则自动创建一个新的 Lock 对象 4class asyncio.Condition(lock=None, *) 5 6 7# 获取底层锁 8coroutine acquire() 9 10 11# 等待, 并释放底层锁。 12# 等待直至被其他协程 notify 唤醒, 被唤醒后会重新请求获取底层锁, 获取到底层锁后才能停止阻塞。 13# 调用此方法必须先获取到底层锁。 14coroutine wait() 15 16 17# 唤醒最多 n 个正在 wait() 等待此条件的协程, 如果没有任何协程在等待则此方法为空操作。 18# 调用此方法必须先获取到底层锁, 并在调用此方法后被快速释放。 19notify(n=1) 20 21 22# 唤醒所有正在 wait() 等待此条件的协程, 如果没有任何协程在等待则此方法为空操作。 23# 调用此方法必须先获取到底层锁, 并在调用此方法后被快速释放。 24notify_all() 25 26 27# 释放底层锁 28release() 29 30 31# 获取底层锁的锁定状态 32locked() 33 34 35# predicate 为可调用对象, 其返回值为 bool 类型。 36# 等待直到被唤醒, 并且 predicate() 返回 True 则结束阻塞。 37# 如果第一次调用 predicate() 就返回 True, 则直接返回, 不 wait()。 38# 即使被唤醒且获取到底层锁, 但如果 predicate() 返回 False, 则仍然会继续 wait()。 39# 40# 此方法的内部代码: 41# async def wait_for(self, predicate): 42# result = predicate() 43# while not result: 44# await self.wait() 45# result = predicate() 46# return result 47# 48coroutine wait_for(predicate) 49 50

asyncio.Condition 代码示例:

1import asyncio 2 3 4async def async_task_01(cond: asyncio.Condition): 5 await asyncio.sleep(3) 6 print("async_task_01") 7 8 await cond.acquire() # 获取底层锁 9 try: 10 cond.notify_all() # 唤醒所有正在等待此条件的协程(必须获取到锁后才能调用) 11 finally: 12 cond.release() # 释放底层锁 13 14 15async def async_task_02(cond: asyncio.Condition): 16 await cond.acquire() # 获取底层锁 17 try: 18 await cond.wait() # 等待 并 释放底层锁, 直到被其他协程唤醒(必须获取到锁后才能调用), 19 print("async_task_02") # 被唤醒后会重新请求获取底层锁, 获取到底层锁后才能停止阻塞 20 finally: 21 cond.release() # 释放底层锁 22 23 24async def main(): 25 cond = asyncio.Condition() # 创建 条件对象(自动创建底层锁) 26 27 t1 = async_task_01(cond) 28 t2 = async_task_02(cond) 29 30 await asyncio.gather(t1, t2) # 并发运行两个协程 31 32 33asyncio.run(main()) 34 35

asyncio.Condition 支持 async with 语句:

1async with cond: 2 cond.wait() 3 4

上面代码相当于:

1await cond.acquire() 2try: 3 await cond.wait() 4finally: 5 cond.release() 6 7

5.4 信号量对象: Semaphore

asyncio.Semaphore 信号量对象。信号量对象内部会管理一个计数器,该计数器会随着每次 调用 acquire() 递减,调用 release() 递增。计数器的值永远不会小于 0。当前调用 acquire() 时如果计数的值为 0,则将阻塞等待,直到其他协程调用了 release() 将计数器的值递增。

asyncio.Semaphore 类的方法:

1# 创建信号量对象。 2# value 参数表示计数器的初始值, 如果传入 小于 0 的值则抛出 ValueError 异常 3class asyncio.Semaphore(value=1, *) 4 5 6# 获取一个信号量。 7# 如果计数器的值 > 0, 则将其减一并立即返回。 8# 如果计数器的值 = 0, 则阻塞等待, 直到其他协程调用了 release() 将计数器的值递增。 9coroutine acquire() 10 11 12# 如果无法立即获取一个信号量(即 acquire() 会阻塞), 则返回 True。 13locked() 14 15 16# 释放一个信号量, 并将计数器的值加一。 17# 可以唤醒一个正在等待获取信号量的其他协程。 18release() 19 20 21# 不同于 BoundedSemaphore , Semaphore 允许 release() 调用次数多于 acquire(), 22# 但一般情况通常先 acquire() 再 release() 成对调用(计数器的初始值需 >= 1)。 23 24

asyncio.Semaphore 代码示例:

1import asyncio 2 3 4async def async_task_01(semaphore: asyncio.Semaphore): 5 while True: 6 await asyncio.sleep(1) 7 8 await semaphore.acquire() # 获取信号量, 获取成功后计数器减一 9 try: 10 print("async_task_01") 11 finally: 12 semaphore.release() # 释放信号量, 计数器加一 13 14 15async def async_task_02(semaphore: asyncio.Semaphore): 16 while True: 17 await asyncio.sleep(1) 18 19 await semaphore.acquire() # 获取信号量, 获取成功后计数器减一 20 try: 21 print("async_task_02") 22 finally: 23 semaphore.release() # 释放信号量, 计数器加一 24 25 26async def main(): 27 semaphore = asyncio.Semaphore(1) # 创建 信号量对象 28 29 t1 = async_task_01(semaphore) 30 t2 = async_task_02(semaphore) 31 32 await asyncio.gather(t1, t2) # 并发运行两个协程 33 34 35asyncio.run(main()) 36 37

asyncio.Semaphore 支持 async with 语句:

1async with semaphore: 2 BLOCK 3 4

上面代码相当于:

1await semaphore.acquire() 2try: 3 BLOCK 4finally: 5 semaphore.release() 6 7

5.5 绑定的信号量对象: BoundedSemaphore

asyncio.BoundedSemaphore 绑定的信号量对象,其继承自 Semaphore。BoundedSemaphore 是特殊版本的 Semaphore,如果在调用 release() 后导致计数器的值增加到初始化以上,则会抛出 ValueError 异常。也就是说 BoundedSemaphore 必须先 acquire() 再 release() 成对调用。

asyncio.BoundedSemaphore 类的构造方法:

1class asyncio.BoundedSemaphore(value=1, *) 2 3

代码交流 2021