为了提升性能,例如 tronado、fastapi、django 3.x asgi、aiohttp 等,越来越多的框架都在想异步方向发展。
本篇将讲解协程基础和 asyncio 相关内容,python 3.11 官方文档:协程与任务
# 协程
协程 ( coroutine
) 也可以被称为微线程,是一种用户态的上下文切换技术,简而言之,就是通过一个线程实现代码块相互切换执行。
与计算机本就提供的线程和进程不同,协程并非计算机提供。而是程序员人为创造的。
举个例子,对于如下代码,根据代码执行顺序,会先执行 func1,然后执行 func2,那如果我们希望在执行 func1 的过程中,切换到 func2,执行部分代码再切回 func1,该怎么做呢,这就需要用到协程。
def func1 (): | |
print (1) | |
... | |
print (2) | |
def func2 (): | |
print (3) | |
... | |
print (4) | |
func1 () | |
func2 () |
# 实现协程的方法
其实,实现协程有很多方法,例如
- 使用第三方模块:greenlet(早期模块)
- yield 关键字
- asyncio 装饰器(python3.4 引入)
- async、await 关键字(python3.5 引入)
目前最主流的是使用 async、await 关键字。
下面,对实现协程的方法逐一讲解
# greenlet
由于 greelnet 是一个第三方模块,在使用之前,需要安装该模块
pip install greenlet |
在使用时,我们先将函数封装到 greenlet
里,生成 gr1
对象,之后,我们可以在函数中使用 switch ()
进行跳转
from greenlet import greenlet | |
def func1 (): | |
print (1) # 第 2 步:输出 1 | |
gr2.switch () # 第 3 步:切换到 func2 | |
print (2) # 第 6 步:输出 2 | |
gr2.switch () # 第 7 步: 切换到 func2,从上一次执行的位置继续向后执行 | |
def func2 (): | |
print (3) # 第 4 步: 输出 3 | |
gr1.switch () # 第 5 步: 切换到 func1,从上一次执行的位置继续向后执行 | |
print (4) # 第 8 步:输出 4 | |
gr1 = greenlet (func1) | |
gr2 = greenlet (func2) | |
gr1.switch () # 第 1 步:执行 func1 |
# yield 关键字
** 这种方式较为牵强,一般不会使用 **
yield 是 python 生成器关键字
当在生成器函数中使用 yield
语句时,函数的执行将会暂停,并将 yield
后面的表达式作为当前迭代的值返回。
然后,每次调用生成器的 next ()
方法或使用 for
循环进行迭代时,函数会从上次暂停的地方继续执行,直到再次遇到 yield
语句。这样,生成器函数可以逐步产生值,而不需要一次性计算并返回所有结果。
def func1 (): | |
yield 1 | |
yield from func2 () | |
yield 2 | |
def func2 (): | |
yield 3 | |
yield 4 | |
f1 = func1 () | |
for item in f1: | |
print (item) |
# asyncio
在 ptrhon 3.4 及以后的版本引入了 asyncio (python 3.4 - 3.7 )
@asyncio.coroutine
在 Python 3.8 + 已被标记为过时(deprecated)
在 Python 3.11 + 中该装饰器已被移除
import asyncio | |
@asyncio.coroutine | |
def func1 (): | |
print (1) | |
yield from asyncio.sleep (2) # 遇到 IO 耗时操作,自动切换到 tasks 中的其他任务 | |
print (2) | |
@asyncio.coroutine | |
def func2 (): | |
print (3) | |
yield from asyncio.sleep (2) # 遇到 IO 耗时操作,自动切换到 tasks 中的其他任务 | |
print (4) | |
tasks = [ | |
asyncio.ensure_future (func1 ()), | |
asyncio.ensure_future (func2 ()) | |
] | |
loop = asyncio.get_event_loop () | |
loop.run_until_complete (asyncio.wait (tasks)) |
# async & await 关键字
async & await 语法的本质与上述 asyncio 一致,只是写法更加简洁,故而现在写法一般用 async & await (python ≥ 3.5)
在定义函数时使用 async def,使用 await 处理 IO 操作
import asyncio | |
async def func1 (): | |
print (1) | |
await asyncio.sleep (2) # 遇到 IO 耗时操作,自动切换到 tasks 中的其他任务 | |
print (2) | |
async def func2 (): | |
print (3) | |
await asyncio.sleep (2) # 遇到 IO 耗时操作,自动切换到 tasks 中的其他任务 | |
print (4) | |
tasks = [ | |
asyncio.ensure_future (func1 ()), | |
asyncio.ensure_future (func2 ()) | |
] | |
loop = asyncio.get_event_loop () | |
loop.run_until_complete (asyncio.wait (tasks)) |
# 协程的意义
在一个线程中,如果遇到 IO 阻塞,线程会利用空闲时间去处理其余的事件。
在得到时间片时,如果遇到 IO ,使用时间片处理别的事件。
在上述代码中,我们使用 sleep () 来模拟 IO 操作,可能这种示例并不清晰,实际上, 协程在 并发、爬虫、网络通信、任务队列、流处理 等场景均有广泛应用。
下面,我们举一个更加实际的例子,从网上下载六张图片,保存到代码所在根目录下的 images 目录
import requests | |
import time | |
import os | |
def creat_images_dir () -> str: | |
""" | |
获取 images 路径,不存在则创建 | |
""" | |
# 获取当前代码的根目录 | |
root_directory = os.getcwd () | |
# 在根目录下创建一个名为 images 的目录 | |
images_directory = os.path.join (root_directory, 'images') | |
if not os.path.exists (images_directory): | |
print (f"已在根目录下创建 images 目录: {images_directory}") | |
os.makedirs (images_directory) | |
return images_directory | |
print (f"目录 images 已存在:{images_directory}") | |
return images_directory | |
def download_image (url: str): | |
""" | |
从 url 下载图片并保存到 images 下 | |
""" | |
print (f"[+] 开始下载图片",end="->") | |
# 发送网络请求 | |
responce = requests.get (url) | |
print (f"[+] 下载完成: {url}") | |
# 从 URL 中提取文件名,保存到本地目录 | |
filename = url.split ('/')[-1] | |
file_path = os.path.join (images_dir, filename) | |
with open (file_path, mode='wb') as file_object: | |
file_object.write (responce.content) | |
if __name__ == '__main__': | |
url_list = [ | |
"https://img.timelessq.com/images/2022/07/26/2f015bf5c14d8b24eba440ef80422b8d.jpg", | |
"https://img.timelessq.com/images/2022/07/26/662d37b8a580b466da309ec795b7ae55.jpg", | |
"https://img.timelessq.com/images/2022/07/26/f53974c854a887e80e9f72685a99854c.jpg", | |
"https://img.timelessq.com/images/2022/07/26/a956521148df0241038d6f6838a9806f.jpg", | |
"https://img.timelessq.com/images/2022/07/26/1406d3cdecfa0dd6e62f13a601eb9230.jpg", | |
"https://img.timelessq.com/images/2022/07/26/d757e4c9c546cd7dba16ee5e23dc2d9d.jpg" | |
] | |
global images_dir | |
images_dir = creat_images_dir () | |
start_time = time.time () | |
for item in url_list: | |
download_image (item) | |
exec_time = time.time () - start_time | |
print (f"[+] 下载完毕,耗时 {exec_time}") |
在上述代码中,函数依次下载并保存每一张图片,只有当前一张图片已经下载完毕,才开始下一张图片,当前环境运行耗时约为 6 秒
现在,我们稍微修改代码,为了实现协程,我们需要使用 asyncio
库来运行异步任务。同时,由于 requests
库不支持异步操作,我们需要使用 aiohttp
库来替代它进行异步 HTTP 请求。
import aiohttp | |
import asyncio | |
import time | |
import os | |
def creat_images_dir () -> str: | |
""" | |
获取 images 路径,不存在则创建 | |
""" | |
# 获取当前代码的根目录 | |
root_directory = os.getcwd () | |
# 在根目录下创建一个名为 images 的目录 | |
images_directory = os.path.join (root_directory, 'images') | |
if not os.path.exists (images_directory): | |
print (f"已在根目录下创建 images 目录: {images_directory}") | |
os.makedirs (images_directory) | |
return images_directory | |
print (f"目录 images 已存在:{images_directory}") | |
return images_directory | |
async def download_image (url: str, session): | |
""" | |
从 url 下载图片并保存到 images 下 | |
""" | |
print (f"[+] 开始下载图片",end="->") | |
# 发送网络请求 | |
async with session.get (url) as response: | |
if response.status != 200: | |
print (f"[-] {response.status} 下载失败: {url}") | |
return | |
print (f"[+] 下载完成: {url}") | |
# 从 URL 中提取文件名,保存到本地目录 | |
filename = url.split ('/')[-1] | |
file_path = os.path.join (images_dir, filename) | |
with open (file_path, mode='wb') as file_object: | |
file_object.write (await response.read ()) | |
async def main (url_list: list): | |
# 创建 aiohttp 客户端会话 | |
async with aiohttp.ClientSession () as session: | |
# 创建任务列表 | |
tasks = [download_image (url, session) for url in url_list] | |
await asyncio.gather (*tasks) | |
if __name__ == '__main__': | |
url_list = [ | |
"https://img.timelessq.com/images/2022/07/26/2f015bf5c14d8b24eba440ef80422b8d.jpg", | |
"https://img.timelessq.com/images/2022/07/26/662d37b8a580b466da309ec795b7ae55.jpg", | |
"https://img.timelessq.com/images/2022/07/26/f53974c854a887e80e9f72685a99854c.jpg", | |
"https://img.timelessq.com/images/2022/07/26/a956521148df0241038d6f6838a9806f.jpg", | |
"https://img.timelessq.com/images/2022/07/26/1406d3cdecfa0dd6e62f13a601eb9230.jpg", | |
"https://img.timelessq.com/images/2022/07/26/d757e4c9c546cd7dba16ee5e23dc2d9d.jpg" | |
] | |
global images_dir | |
images_dir = creat_images_dir () | |
start_time = time.time () | |
# 运行主函数 | |
asyncio.run (main (url_list)) | |
exec_time = time.time () - start_time | |
print (f"[+] 下载完毕,耗时 {exec_time}") |
现在,代码中的下载操作将被异步执行,当前环境下运行耗时约为 2 秒
# 异步编程
# 事件循环
我们可以将事件循环理解成为一个 wile True,不断检查并执行 tasks 池中的任务。
下面,使用伪代码解释事件循环
# 伪代码 | |
任务列表 = [ 任务 1, 任务 2, 任务 3, ... ] | |
wile True: | |
可执行的任务列表,已完成的任务列表 = 去任务列表中检查所有的任务,将 ' 可执行 ' 和 ' 已完成 ' 的任务返回 | |
for 就绪任务 in 可执行的任务列表: | |
执行已就绪的任务 | |
for 已完成的任务 in 已完成的任务列表: | |
从任务列表中移除已完成的任务 | |
if 任务列表 为空,终止循环 |
让我们再回顾一下 async 的早期版本中,如何实现异步:
import asyncio | |
# 生成一个事件循环 | |
loop = asyncio.get_event_loop () | |
# 将任务放入 ' 任务列表 ' | |
loop.run_until_complete (任务列表) |
# 代码模板
这里提供两种代码模板
- 使用
loop
,实现一个简易的 async 函数格式
- 定义 / 创建协程对象
- 定义事件循环对象容器
- 将协程转换为 task 任务
- 将 task 任务放入事件循环对象中触发
import asyncio | |
# 定义协程对象 | |
async def func (name): | |
print ("hello", name) | |
# 1. 创建协程对象 | |
coro = func ('world') | |
# 2. 获取事件循环对象容器 | |
loop = asyncio.get_event_loop () | |
# 3. 将协程对象转换为 task | |
task = asyncio.ensure_future (coro) | |
# 4. 将 task 任务放入事件循环对象中触发 | |
loop.run_until_complete (task) |
上述示例中,第三步可以用
task = loop.create_task (coro)
或task = asyncio.ensure_future (coro)
,区别为,后者可处理更多对象,更加灵活
如果要接收任务列表,则第四步中需要使用
asyncio.wait
处理任务列表,即loop.run_until_complete (asyncio.wait (tasks))
- 直接使用
run
方法,实现一个简易的 async 函数格式,形如
- 定义 / 创建协程对象
- 将协程对象转换为 task 任务
- 在主函数中,使用
await task
等待 task 完成
- 在主函数中,使用
- 使用
asyncio.run
方法运行主函数
- 使用
import asyncio | |
async def func1 () -> None: | |
print ("1") | |
await asyncio.sleep (2) | |
print ("2") | |
async def func2 () -> None: | |
print ("3") | |
await asyncio.sleep (2) | |
print ("4") | |
async def main () -> None: | |
task1: asyncio.Task = asyncio.create_task (func1 ()) | |
task2: asyncio.Task = asyncio.create_task (func2 ()) | |
tasks = [task1, task2] | |
await asyncio.gather (*tasks) | |
if __name__ == "__main__": | |
asyncio.run (main ()) |
下面对异步函数的写法进行逐步拆解,但是我们得先了解两个概念:
协程函数:在定义函数时,在 def 前具备 async 关键字 或 使用装饰器
协程对象:执行写成函数得到的对象
# 协程函数 | |
async def func (): | |
pass | |
result = func () |
定义协程对象之后,我们需要让协程对象进入事件循环,才能执行
import asyncio | |
async def func (): | |
print ("hello") | |
result = func () | |
loop = asyncio.get_event_loop () | |
loop.run_until_complete ( result ) |
在 python 3.7 之后,我们有了更便捷的写法,在逻辑不变的形况下,简化了代码
import asyncio | |
async def func (): | |
print ("hello") | |
result = func () | |
asyncio.run (result) |
asyncio.run ()
需要一个可调用的协程对象,如果想要多个协程函数并发执行,我们需要用到 asyncio.gather ()
可以接受多个协程,在被调用时同时启动。
asyncio.gather ()
import asyncio | |
async def func1 (): | |
print ("1") | |
await asyncio.sleep (2) | |
print ("2") | |
async def func2 (): | |
print ("3") | |
await asyncio.sleep (2) | |
print ("4") | |
async def main (): | |
await asyncio.gather (func1 (), func2 ()) | |
asyncio.run (main ()) |
当然,我们也可以将所有的协程函数装入列表,然后传递给 asyncio.gather ()
,但是在传递时,需要使用 *
解包列表
注意:由于
asyncio.gather ()
需要接收多个协程作为单独的参数,而不是一个列表。
所以将列表tasks
传递给asyncio.gather ()
时,一定要使用*
操作符将列表展开为多个参数,即asyncio.gather (*tasks)
我们只需要修改 main
async def main (): | |
tasks = [func1 (), func2 ()] | |
await asyncio.gather (*tasks) # 使用 * 解包列表 |
虽然直接传递协程对象给 gather () 可以工作,但更规范的写法应该显式创建任务:
async def main (): | |
tasks = [asyncio.create_task (func1 ()), asyncio.create_task (func2 ())] | |
await asyncio.gather (*tasks) |
# await 关键字
await + 可等待对象(协程对象、Future、Task 对象、网络 IO 等)
在异步函数执行中,当遇到 awaite 时,转而执行 asyncio.gather (*tasks)
中其余的异步函数,当某一次 loop 检测到当前异步的 IO 操作执行完毕,再回来继续执行当前函数
await 让主函数需要等待异步完成,如果没有 await,在异步函数 IO 时,主函数直接结束
举个例子
import asyncio | |
async def func (Flag: str) -> str: | |
print (1) | |
await asyncio.sleep (2) | |
print (2) | |
return Flag | |
async def main (): | |
print ("main start") | |
task1 = asyncio.create_task (func ("task1")) | |
task2 = asyncio.create_task (func ("task2")) | |
# result1 = await task1 | |
# result2 = await task2 | |
# print (result1, result2) | |
print ("main end") | |
if __name__ == '__main__': | |
asyncio.run (main ()) |
这里,我们注释掉 await,此时,代码输出为:
main start | |
main end | |
1 | |
1 |
显然,协程正在执行 func,此时两个 func 均遇到了 IO ,只能等待,但是由于 main 没有 await,程序直接结束。
# 示例 1:await + task
import asyncio | |
async def func1 (): | |
print ("1") | |
await asyncio.sleep (2) | |
print ("2") | |
async def func2 (): | |
print ("1") | |
await asyncio.sleep (2) | |
print ("2") | |
async def main (): | |
tasks = [ | |
asyncio.create_task (func1 ()), | |
asyncio.create_task (func2 ()) | |
] | |
await asyncio.gather (*tasks) | |
if __name__ == '__main__': | |
asyncio.run (main ()) |
# 示例 2:await + 协程对象
如下,代码定义了协程函数 others
,其对象 others ()
为一个协程对象,可以使用 await
import asyncio | |
async def others () -> str: | |
print ("start") | |
await asyncio.sleep (2) | |
print ("end") | |
return ' 返回值 ' | |
async def func () -> None: | |
print ("执行协程函数内部代码") | |
# 当遇到 IO 操作时,挂起当前协程(任务),等待 IO 操作完成之后再继续向下执行,当前协程挂起时,事件循环还可以执行其他协程(任务) | |
response = await others () | |
print (f"IO 请求结束,结果为 {response}") | |
if __name__ == '__main__': | |
asyncio.run (func ()) |
# 多 await 顺序执行
有的时候,一段代码里我们需要使用多个 await,但是,第二个 await 的函数需要使用到第一个 await 中的内容
此时,我们需要第一个 await 执行完毕,才执行第二个 await
让我们稍微修改一下上述代码:
async def func () -> None: | |
print ("执行协程函数内部代码") | |
response1 = await others () | |
print (f"IO 请求结束,结果为 {response1}") | |
response2 = await others () | |
print (f"IO 请求结束,结果为 {response2}") |
在代码执行时,遇到第一个 await
,此时对 others ()
内的操作进行等待,并不会执行下面的 respose2
,只有当第一个 await 执行完毕,程序顺次执行,才能继续执行下面的 response2
# 多 await 异步执行
那如果我们就是希望这两个 await 是异步呢,依旧是使用上述的 asyncio.gather (*tasks)
操作:
import asyncio | |
async def others () -> str: | |
print ("start") | |
await asyncio.sleep (2) | |
print ("end") | |
return ' 返回值 ' | |
async def func (limit: int) -> None: | |
print ("执行协程函数内部代码") | |
tasks = [] | |
for _ in range (limit): | |
task: asyncio.Task = asyncio.create_task (others ()) | |
tasks.append (task) | |
# response 将会返回一个列表:[' 返回值 ', ' 返回值 ', ' 返回值 '] | |
responses = await asyncio.gather (*tasks, return_exceptions=True) | |
print (f"IO 请求结束,结果为 {responses}") | |
if __name__ == '__main__': | |
asyncio.run (func (limit=3)) |
在这段代码中, asyncio.create_task (others ())
会将协程对象包装成任务(Task)
我们创建一个任务列表 tasks,使用显示的方式创建每一个 task 并加入 tasks
: asyncio.Task
是一种类型提示(Type Hint),显式声明变量 task 的类型是 asyncio.Task,不改变实际运行行为,只增加代码可读性
task: asyncio.Task = asyncio.create_task (others ()) | |
tasks.append (task) |
# Task 对象
在 python 的官方文档中,对 Task 的描述为
Task 对象被用来在事件循环中运行协程。如果一个协程在等待一个 Future 对象,Task 对象会挂起该协程的执行并等待该 Future 对象完成。当该 Future 对象 完成,被打包的协程将恢复执行。
事件循环使用协同日程调度:一个事件循环每次运行一个 Task 对象。而一个 Task 对象会等待一个 Future 对象完成,该事件循环会运行其他 Task、回调或执行 IO 操作。
即,task 用于在事件循环中添加多个任务,并并发调度协程,通过 asyncio.create_task ()
创建一个 Task 对象,不需要手动实例化 Task 对象。
示例 1:
注意:示例代码并非标准写法,仅为方便对比
import asyncio | |
async def func (Flag: str) -> str: | |
print (1) | |
await asyncio.sleep (2) | |
print (2) | |
return Flag | |
async def main (): | |
print ("main start") | |
task1 = asyncio.create_task (func ("task1")) | |
task2 = asyncio.create_task (func ("task2")) | |
print ("main end") | |
# 当执行某协程遇到 IO ,自动化切换到其他任务 | |
# 此处 awaite 等待对应协程执行完毕并获取结果 | |
result1 = await task1 | |
result2 = await task2 | |
print (result1, result2) | |
if __name__ == '__main__': | |
asyncio.run (main ()) |
结合我们在之前的示例,我们能够更清楚的理解 Task 的作用:
async def main (): | |
print ("main start") | |
result1 = await func ("func1") | |
result2 = await func ("func2") | |
print (result1, result2) |
在之前的示例中,我们使用 await + 协程对象
,我们可以看到,代码将会执行完毕 func1 之后,再执行 func2
而在此处,我们使用 await + task
,代码将会同时执行 task1 和 task2
这是因为:
await + task
:task1
和task2
几乎同时启动。asyncio.create_task ()
将func ("task1")
和func ("task2")
放入事件循环中,允许它们并发运行。await + 协程对象
:func ("func1")
首先被调用,并且在await asyncio.sleep (2)
处挂起。在这两秒钟内,没有其他任务在运行,因为main ()
函数在等待func ("func1")
完成。只有当func ("func1")
完成后,func ("func2")
才开始执行。
一般的,我们会将 task 加入到 list 里,再使用 asyncio.gather (*tasks)
或者 asyncio.wait (task)
# task.result ( )
使用 task.result ()
获取返回值
import asyncio | |
async def func1 (x) -> None: | |
print ("1") | |
await asyncio.sleep (2) | |
print ("2") | |
return x*x | |
async def func2 (y) -> None: | |
print ("3") | |
await asyncio.sleep (2) | |
print ("4") | |
return y*y | |
async def main () -> None: | |
task1: asyncio.Task = asyncio.create_task (func1 (12)) | |
task2: asyncio.Task = asyncio.create_task (func2 (2)) | |
tasks = [task1, task2] | |
await asyncio.gather (*tasks) | |
for task in tasks: | |
print (task.result ()) | |
if __name__ == "__main__": | |
asyncio.run (main ()) |
# 并发运行协程
或许在上述示例中你会发现,并发运行协程时,我们使用了两种方式asyncio.wait
和 asyncio.gather
,两个函数都用于将多个协程(coroutines)或期程(futures)聚集在一起,并等待它们全部完成。
# asyncio.gather
asyncio.gather
的基本用法例如:
import asyncio | |
async def task (n): | |
await asyncio.sleep (n) | |
return n | |
async def main (): | |
# 创建任务列表 | |
tasks = [task (i) for i in range (1, 4)] | |
# 使用 asyncio.gather 并发运行所有任务 | |
results = await asyncio.gather (*tasks) | |
# 打印结果 | |
print (results) | |
asyncio.run (main ()) |
asyncio.gather
函数定义如下
(function) def gather ( | |
*coros_or_futures: _FutureLike [_T@gather], | |
return_exceptions: Literal [False] = False | |
) -> Future [list [_T@gather]] |
*coros_or_futures
:gathe
接受一个可迭代的参数,通常是一个协程列表。使用时需要解包(*tasks)。return_exceptions
: 参数为 False(默认值),任何一个协程抛出异常都会导致gather
抛出异常并停止其他协程。如果return_exceptions
为True
,异常会被捕获并作为结果列表的一部分返回。- 返回值:
gather
返回一个包含所有协程结果的列表。
# asyncio.wait
asyncio.wait
的基本用法例如:
import asyncio | |
async def task (n): | |
await asyncio.sleep (n) | |
return n | |
async def main (): | |
tasks = [task (i) for i in range (1, 4)] | |
# 等待最多 3 秒 | |
done, pending = await asyncio.wait (tasks, timeout=3) # timeout 也可以设置 None | |
# 处理完成的任务 | |
for done_task in done: | |
result = await done_task | |
print (f"Task completed with result: {result}") | |
# 处理未完成的任务 | |
for pending_task in pending: | |
print (f"Task is still pending: {pending_task}") | |
asyncio.run (main ()) |
done
集合包含所有已经完成的协程或期程(futures)pending
集合包含所有尚未完成的协程或期程。
asyncio.gather
函数定义如下
(function) def wait ( | |
fs: Iterable [_FT@wait], | |
*, | |
timeout: float | None = None, | |
return_when: str = "ALL_COMPLETED" | |
) -> Coroutine [Any, Any, tuple [set [_FT@wait], set [_FT@wait]]] |
tasks
:wait
接受一个协程列表,不需要解包。返回值:wait 返回一个元组,包含两个集合:(done, pending)。done 是已经完成的协程的集合,pending 是尚未完成的协程的集合。要获取结果,你需要从 done 集合中的每个协程调用 .result () 方法。
timeout
:wait
可以设置一个超时时间,如果在指定的时间内协程没有完成,wait
会返回。return_when
: 参数用于指定函数何时返回。它是一个字符串,有三个可能的值,每个值都定义了不同的返回条件:ALL_COMPLETED
(默认值):asyncio.wait
会等待所有的协程或期程(futures)完成后再返回。这意味着只有当所有传入的协程或期程都执行完毕后,wait 函数才会返回。
FIRST_COMPLETED
:asyncio.wait
会在第一个协程或期程完成时立即返回。不管其他协程或期程是否完成,只要有一个完成了,wait
就会返回。
FIRST_EXCEPTION
:asyncio.wait
会在第一个协程或期程抛出异常时返回,或者如果所有协程或期程都成功完成,则在所有都完成时返回。如果没有任何协程或期程抛出异常,它的行为就像ALL_COMPLETED
。
错误处理:
wait
不会自动抛出异常。即使有协程抛出异常,wait
也会继续等待其他协程完成。异常需要通过调用协程的.result ()
方法来处理。
# asyncio.Future 对象
Future 是 Task 的基类,Task 对象内部的 await 的处理是基于 Future
示例 1:
代码将一直运行
async def main (): | |
# 获取当前事件循环 | |
loop = asyncio.get_running_loop () | |
# 创建一个任务(Future 对象),这个任务什么都不干 | |
fut = loop.create_future () | |
await fut |
示例 2:
import asyncio | |
async def set_after (fut): | |
await asyncio.sleep (2) | |
fut.set_result ('12') | |
async def main (): | |
# 获取当前事件循环 | |
loop = asyncio.get_running_loop () | |
# 创建一个任务(Future 对象),没有绑定任何行为,任务永远不会结束 | |
fut = loop.create_future () | |
# 创建一个任务(Task 对象),绑定 set_after ,函数内部再 2s 后给 fut 赋值 | |
# 即手动设置了 future 任务的最终结果,此时 fut 结束 | |
await loop.create_task (set_after (fut)) | |
data = await fut | |
print (data) | |
if __name__ == '__main__': | |
asyncio.run (main ()) |
# concurrent.Future 对象
concurrent.Future
对象 与 asyncio.Future
对象没有任何关系,concurrent.Future
是用于实现进程池和线程池异步操作时的对象。
import time | |
from concurrent.futures import Future | |
from concurrent.futures.thread import ThreadPoolExecutor | |
from concurrent.futures.process import ProcessPoolExecutor | |
def func (value): | |
time.sleep (1) | |
print (value) | |
return 123 | |
# 创建线程池 | |
pool = ThreadPoolExecutor (max_workers=5) | |
# 创建进程池 | |
# pool = ProcessPoolExecutor (max_workers=5) | |
for i in range (10): | |
fut = pool.submit (func, i) | |
print (fut) |
在代码中,可能会存在 asyncio.future
和 concurrent.Future
两个对象交叉使用的情况
例如,某 CRM 项目的大部分使用协程异步,但是项目的第三方模块(例如项目数据库使用 MySql)不支持异步的协程,只能使用进程或者线程
import time | |
import asyncio | |
import concurrent.futures | |
def func1 (): | |
# 某耗时操作 | |
time.sleep (2) | |
return "func1 return" | |
async def main (): | |
loop = asyncio.get_running_loop () | |
# 1. run in the default lopp's executor (默认 ThreadpoolExecutor) | |
# 第一步,内部先调用 ThreadpoolExecutor 的 submit 方法去线程池中申请一个线程去执行 func1 函数,并返回一个 concurrent.futures.Future 对象 | |
# 第二步,调用 asyncio.wrap_future 将 concurrent.futures.Future 对象 包装为 asyncio.Future 对象 | |
# 因为 concurrent.futures.Future 对象不支持 await 方法,所以需要包装成 asyncio.Future 对象才能使用 | |
fut = loop.run_in_executor (None, func1) | |
result = await fut | |
print ('default thread pool', result) | |
# 2. run in a custom pool: | |
# with concurrent.futures.ThreadPoolExecutor () as pool: | |
# result = await loop.run_in_executor (pool, func1) | |
# print ('custom thread pool', result) | |
# 3. run in a custom process pool: | |
# with concurrent.futures.ProcessPoolExecutor () as pool: | |
# result = await loop.run_in_executor (pool, func1) | |
# print ('custom process pool', result) | |
asyncio.run (main ()) |
案例:异步与非异步模块混合
示例代码:asyncio + requests
import requests | |
import asyncio | |
import time | |
import os | |
def creat_images_dir () -> str: | |
""" | |
获取 images 路径,不存在则创建 | |
""" | |
# 获取当前代码的根目录 | |
root_directory = os.getcwd () | |
# 在根目录下创建一个名为 images 的目录 | |
images_directory = os.path.join (root_directory, 'images') | |
if not os.path.exists (images_directory): | |
print (f"已在根目录下创建 images 目录: {images_directory}") | |
os.makedirs (images_directory) | |
return images_directory | |
print (f"目录 images 已存在:{images_directory}") | |
return images_directory | |
async def download_image (url: str): | |
""" | |
从 url 下载图片并保存到 images 下 | |
""" | |
print (f"[+] 开始下载图片",end="->") | |
loop = asyncio.get_event_loop () | |
# requests 模块不支持异步操作,所以使用线程池配合实现 | |
future = loop.run_in_executor (None, requests.get, url) | |
response = await future | |
print (f"[+] 下载完成: {url}") | |
# 从 URL 中提取文件名,保存到本地目录 | |
filename = url.split ('/')[-1] | |
file_path = os.path.join (images_dir, filename) | |
with open (file_path, mode='wb') as file_object: | |
file_object.write (response.content) | |
if __name__ == '__main__': | |
url_list = [ | |
"https://img.timelessq.com/images/2022/07/26/2f015bf5c14d8b24eba440ef80422b8d.jpg", | |
"https://img.timelessq.com/images/2022/07/26/662d37b8a580b466da309ec795b7ae55.jpg", | |
"https://img.timelessq.com/images/2022/07/26/f53974c854a887e80e9f72685a99854c.jpg", | |
"https://img.timelessq.com/images/2022/07/26/a956521148df0241038d6f6838a9806f.jpg", | |
"https://img.timelessq.com/images/2022/07/26/1406d3cdecfa0dd6e62f13a601eb9230.jpg", | |
"https://img.timelessq.com/images/2022/07/26/d757e4c9c546cd7dba16ee5e23dc2d9d.jpg" | |
] | |
global images_dir | |
images_dir = creat_images_dir () | |
start_time = time.time () | |
# 创建新的事件循环 | |
new_loop = asyncio.new_event_loop () | |
asyncio.set_event_loop (new_loop) | |
# 运行主函数 | |
tasks = [asyncio.ensure_future (download_image (url)) for url in url_list] | |
loop = asyncio.get_event_loop () | |
loop.run_until_complete (asyncio.wait (tasks)) | |
exec_time = time.time () - start_time | |
print (f"[+] 下载完毕,耗时 {exec_time}") |
# 回调函数
有时候,我们在下一步程序开始前,我们需要使用到异步函数中的结果,这是就需要回调,这里提供两种回调方式
task.resualt ()
直接通过 task.resualt ()
输出 task 的返回值,如下示例
import asyncio | |
import random | |
# 定义协程对象 | |
async def set_number (): | |
# 生成一个 0 到 100 之间的随机整数 | |
random_number = random.randint (0, 100) | |
return format (random_number) | |
async def main (): | |
task_set: asyncio.Task = asyncio.create_task (set_number ()) | |
await task_set | |
# 获取返回值 | |
number = task_set.result () | |
print (f'random number :{number}') | |
if __name__ == '__main__': | |
asyncio.run (main ()) |
add_done_callback ()
asyncio 自带有 add_done_callback ()
,允许 task 绑定回调函数,适用于在异步结束后仍需对函数进行一些操作的情况
使用方法:
- 定义回调函数,函数接收
future
对象,通过future.result ()
方法获取异步函数返回值 task.add_done_callback ()
绑定回调函数
import asyncio | |
import random | |
# 1. 定义回调函数,接收 future,使用 future.result () 方法获取异步函数返回值 | |
def callback (future): | |
# 模拟对异步函数返回值的进一步操作 | |
number_info = future.result () - 100 | |
print (number_info) | |
return number_info | |
# 定义协程对象 | |
async def set_number (): | |
# 生成一个 0 到 100 之间的随机整数 | |
random_number = random.randint (0, 100) | |
return random_number | |
async def main (): | |
task_set: asyncio.Task = asyncio.create_task (set_number ()) | |
await task_set | |
# 2. 绑定回调函数,task 结束后自动调用 callback | |
task_set.add_done_callback (callback) | |
if __name__ == '__main__': | |
asyncio.run (main ()) |