Asyncio

为了提升性能,例如tronado、fastapi、django 3.x asgi、aiohttp等,越来越多的框架都在想异步方向发展。
本篇将讲解协程基础和 asyncio 相关内容,python 3.11 官方文档:协程与任务

协程

协程( coroutine )也可以被称为微线程,是一种用户态的上下文切换技术,简而言之,就是通过一个线程实现代码块相互切换执行。

与计算机本就提供的线程和进程不同,协程并非计算机提供。而是程序员人为创造的。

举个例子,对于如下代码,根据代码执行顺序,会先执行func1,然后执行func2,那如果我们希望在执行func1的过程中,切换到func2,执行部分代码再切回func1,该怎么做呢,这就需要用到协程。

def func1(): print(1) ... print(2) def func2(): print(3) ... print(4) func1() func2()

实现协程的方法

其实,实现协程有很多方法,例如

  • 使用第三方模块:greenlet(早期模块)
  • yield 关键字
  • asyncio装饰器(python3.4引入)
  • async、await关键字(python3.5引入)

目前最主流的是使用async、await关键字。

下面,对实现协程的方法逐一讲解

greenlet

由于 greelnet 是一个第三方模块,在使用之前,需要安装该模块

pip install greenlet

在使用时,我们先将函数封装到 greenlet 里,生成 gr1 对象,之后,我们可以在函数中使用 switch() 进行跳转

from greenlet import greenlet def func1(): print(1) # 第 2 步:输出 1 gr2.switch() # 第 3 步:切换到 func2 print(2) # 第 6 步:输出 2 gr2.switch() # 第 7 步: 切换到 func2,从上一次执行的位置继续向后执行 def func2(): print(3) # 第 4 步: 输出3 gr1.switch() # 第 5 步: 切换到 func1,从上一次执行的位置继续向后执行 print(4) # 第 8 步:输出 4 gr1 = greenlet(func1) gr2 = greenlet(func2) gr1.switch() # 第 1 步:执行 func1

yield 关键字

这种方式较为牵强,一般不会使用

yield 是python生成器关键字

当在生成器函数中使用 yield 语句时,函数的执行将会暂停,并将 yield 后面的表达式作为当前迭代的值返回。

然后,每次调用生成器的 next () 方法或使用 for 循环进行迭代时,函数会从上次暂停的地方继续执行,直到再次遇到 yield 语句。这样,生成器函数可以逐步产生值,而不需要一次性计算并返回所有结果。

def func1(): yield 1 yield from func2() yield 2 def func2(): yield 3 yield 4 f1 = func1() for item in f1: print(item)

asyncio

在ptrhon 3.4 及以后的版本引入了 asyncio (python 3.4 - 3.7 )

@asyncio.coroutine 在Python 3.8+已被标记为过时(deprecated)

在Python 3.11+中该装饰器已被移除

import asyncio @asyncio.coroutine def func1(): print(1) yield from asyncio.sleep(2) # 遇到IO耗时操作,自动切换到tasks中的其他任务 print(2) @asyncio.coroutine def func2(): print(3) yield from asyncio.sleep(2) # 遇到IO耗时操作,自动切换到tasks中的其他任务 print(4) tasks = [ asyncio.ensure_future(func1()), asyncio.ensure_future(func2()) ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))

async & await 关键字

async & await 语法的本质与上述 asyncio 一致,只是写法更加简洁,故而现在写法一般用 async & await (python ≥ 3.5)

在定义函数时使用 async def,使用 await 处理IO操作

import asyncio async def func1(): print(1) await asyncio.sleep(2) # 遇到IO耗时操作,自动切换到tasks中的其他任务 print(2) async def func2(): print(3) await asyncio.sleep(2) # 遇到IO耗时操作,自动切换到tasks中的其他任务 print(4) tasks = [ asyncio.ensure_future(func1()), asyncio.ensure_future(func2()) ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))

协程的意义

在一个线程中,如果遇到 IO 阻塞,线程会利用空闲时间去处理其余的事件。

在得到时间片时,如果遇到 IO ,使用时间片处理别的事件。

在上述代码中,我们使用 sleep() 来模拟 IO 操作,可能这种示例并不清晰,实际上, 协程在 并发、爬虫、网络通信、任务队列、流处理 等场景均有广泛应用。

下面,我们举一个更加实际的例子,从网上下载六张图片,保存到代码所在根目录下的images目录

import requests import time import os def creat_images_dir() -> str: """ 获取images路径, 不存在则创建 """ # 获取当前代码的根目录 root_directory = os.getcwd() # 在根目录下创建一个名为images的目录 images_directory = os.path.join(root_directory, 'images') if not os.path.exists(images_directory): print(f"已在根目录下创建images目录: {images_directory}") os.makedirs(images_directory) return images_directory print(f"目录 images 已存在:{images_directory}") return images_directory def download_image(url: str): """ 从url下载图片并保存到images下 """ print(f"[+] 开始下载图片",end=" -> ") # 发送网络请求 responce = requests.get(url) print(f"[+] 下载完成: {url}") # 从URL中提取文件名,保存到本地目录 filename = url.split('/')[-1] file_path = os.path.join(images_dir, filename) with open(file_path, mode='wb') as file_object: file_object.write(responce.content) if __name__ == '__main__': url_list = [ "https://img.timelessq.com/images/2022/07/26/2f015bf5c14d8b24eba440ef80422b8d.jpg", "https://img.timelessq.com/images/2022/07/26/662d37b8a580b466da309ec795b7ae55.jpg", "https://img.timelessq.com/images/2022/07/26/f53974c854a887e80e9f72685a99854c.jpg", "https://img.timelessq.com/images/2022/07/26/a956521148df0241038d6f6838a9806f.jpg", "https://img.timelessq.com/images/2022/07/26/1406d3cdecfa0dd6e62f13a601eb9230.jpg", "https://img.timelessq.com/images/2022/07/26/d757e4c9c546cd7dba16ee5e23dc2d9d.jpg" ] global images_dir images_dir = creat_images_dir() start_time = time.time() for item in url_list: download_image(item) exec_time = time.time() - start_time print(f"[+] 下载完毕, 耗时{exec_time}")

在上述代码中,函数依次下载并保存每一张图片,只有当前一张图片已经下载完毕,才开始下一张图片,当前环境运行耗时约为 6秒

现在,我们稍微修改代码,为了实现协程,我们需要使用 asyncio 库来运行异步任务。同时,由于 requests 库不支持异步操作,我们需要使用 aiohttp 库来替代它进行异步HTTP请求。

import aiohttp import asyncio import time import os def creat_images_dir() -> str: """ 获取images路径, 不存在则创建 """ # 获取当前代码的根目录 root_directory = os.getcwd() # 在根目录下创建一个名为images的目录 images_directory = os.path.join(root_directory, 'images') if not os.path.exists(images_directory): print(f"已在根目录下创建images目录: {images_directory}") os.makedirs(images_directory) return images_directory print(f"目录 images 已存在:{images_directory}") return images_directory async def download_image(url: str, session): """ 从url下载图片并保存到images下 """ print(f"[+] 开始下载图片",end=" -> ") # 发送网络请求 async with session.get(url) as response: if response.status != 200: print(f"[-] {response.status}下载失败: {url}") return print(f"[+] 下载完成: {url}") # 从URL中提取文件名,保存到本地目录 filename = url.split('/')[-1] file_path = os.path.join(images_dir, filename) with open(file_path, mode='wb') as file_object: file_object.write(await response.read()) async def main(url_list: list): # 创建 aiohttp 客户端会话 async with aiohttp.ClientSession() as session: # 创建任务列表 tasks = [download_image(url, session) for url in url_list] await asyncio.gather(*tasks) if __name__ == '__main__': url_list = [ "https://img.timelessq.com/images/2022/07/26/2f015bf5c14d8b24eba440ef80422b8d.jpg", "https://img.timelessq.com/images/2022/07/26/662d37b8a580b466da309ec795b7ae55.jpg", "https://img.timelessq.com/images/2022/07/26/f53974c854a887e80e9f72685a99854c.jpg", "https://img.timelessq.com/images/2022/07/26/a956521148df0241038d6f6838a9806f.jpg", "https://img.timelessq.com/images/2022/07/26/1406d3cdecfa0dd6e62f13a601eb9230.jpg", "https://img.timelessq.com/images/2022/07/26/d757e4c9c546cd7dba16ee5e23dc2d9d.jpg" ] global images_dir images_dir = creat_images_dir() start_time = time.time() # 运行主函数 asyncio.run(main(url_list)) exec_time = time.time() - start_time print(f"[+] 下载完毕, 耗时{exec_time}")

现在,代码中的下载操作将被异步执行,当前环境下运行耗时约为 2秒

异步编程

事件循环

我们可以将事件循环理解成为一个wile True,不断检查并执行tasks 池中的任务。

下面,使用伪代码解释事件循环

# 伪代码 任务列表 = [ 任务1, 任务2, 任务3, ... ] wile True: 可执行的任务列表, 已完成的任务列表 = 去任务列表中检查所有的任务, 将 '可执行''已完成' 的任务返回 for 就绪任务 in 可执行的任务列表: 执行已就绪的任务 for 已完成的任务 in 已完成的任务列表: 从任务列表中移除已完成的任务 if 任务列表 为空, 终止循环

让我们再回顾一下 async 的早期版本中,如何实现异步:

import asyncio # 生成一个事件循环 loop = asyncio.get_event_loop() # 将任务放入'任务列表' loop.run_until_complete(任务列表)

代码模板

这里提供两种代码模板

  1. 使用 loop ,实现一个简易的 async 函数格式
  • 1.定义/创建协程对象
  • 2.定义事件循环对象容器
  • 3.将协程转换为task任务
  • 4.将task任务放入事件循环对象中触发
import asyncio # 定义协程对象 async def func(name): print("hello", name) # 1.创建协程对象 coro = func('world') # 2.获取事件循环对象容器 loop = asyncio.get_event_loop() # 3.将协程对象转换为task task = asyncio.ensure_future(coro) # 4.将task任务放入事件循环对象中触发 loop.run_until_complete(task)

上述示例中,第三步可以用 task = loop.create_task(coro)task = asyncio.ensure_future(coro),区别为,后者可处理更多对象,更加灵活

如果要接收任务列表,则第四步中需要使用 asyncio.wait 处理任务列表,即 loop.run_until_complete(asyncio.wait(tasks))

  1. 直接使用 run 方法,实现一个简易的 async 函数格式,形如
  • 1.定义/创建协程对象
  • 2.将协程对象转换为task任务
  • 3.在主函数中,使用 await task 等待task完成
  • 4.使用asyncio.run方法运行主函数
import asyncio async def func1() -> None: print("1") await asyncio.sleep(2) print("2") async def func2() -> None: print("3") await asyncio.sleep(2) print("4") async def main() -> None: task1: asyncio.Task = asyncio.create_task(func1()) task2: asyncio.Task = asyncio.create_task(func2()) tasks = [task1, task2] await asyncio.gather(*tasks) if __name__ == "__main__": asyncio.run(main())

下面对异步函数的写法进行逐步拆解,但是我们得先了解两个概念:

  • 协程函数: 在定义函数时,在 def 前具备 async 关键字 或 使用装饰器

  • 协程对象:执行写成函数得到的对象

# 协程函数 async def func(): pass result = func()

定义协程对象之后,我们需要让协程对象进入事件循环,才能执行

import asyncio async def func(): print("hello") result = func() loop = asyncio.get_event_loop() loop.run_until_complete( result )

在 python 3.7 之后,我们有了更便捷的写法,在逻辑不变的形况下,简化了代码

import asyncio async def func(): print("hello") result = func() asyncio.run(result)

asyncio.run() 需要一个可调用的协程对象,如果想要多个协程函数并发执行,我们需要用到 asyncio.gather() 可以接受多个协程,在被调用时同时启动。

asyncio.gather()

import asyncio async def func1(): print("1") await asyncio.sleep(2) print("2") async def func2(): print("3") await asyncio.sleep(2) print("4") async def main(): await asyncio.gather(func1(), func2()) asyncio.run(main())

当然,我们也可以将所有的协程函数装入列表,然后传递给 asyncio.gather() ,但是在传递时,需要使用 * 解包列表

注意:由于 asyncio.gather()需要接收多个协程作为单独的参数,而不是一个列表。
所以将列表 tasks 传递给 asyncio.gather() 时,一定要使用 * 操作符将列表展开为多个参数,即 asyncio.gather(*tasks)

我们只需要修改 main

async def main(): tasks = [func1(), func2()] await asyncio.gather(*tasks) # 使用 * 解包列表

虽然直接传递协程对象给gather()可以工作,但更规范的写法应该显式创建任务:

async def main(): tasks = [asyncio.create_task(func1()), asyncio.create_task(func2())] await asyncio.gather(*tasks)

await 关键字

await + 可等待对象(协程对象、Future、Task对象、网络IO等)

在异步函数执行中,当遇到 awaite 时,转而执行 asyncio.gather(*tasks) 中其余的异步函数,当某一次 loop 检测到当前异步的 IO 操作执行完毕,再回来继续执行当前函数

await 让主函数需要等待异步完成,如果没有await,在异步函数 IO 时,主函数直接结束

举个例子

import asyncio async def func(Flag: str) -> str: print(1) await asyncio.sleep(2) print(2) return Flag async def main(): print("main start") task1 = asyncio.create_task(func("task1")) task2 = asyncio.create_task(func("task2")) # result1 = await task1 # result2 = await task2 # print(result1, result2) print("main end") if __name__ == '__main__': asyncio.run(main())

这里,我们注释掉 await,此时,代码输出为:

main start main end 1 1

显然,协程正在执行 func,此时两个func 均遇到了 IO ,只能等待,但是由于 main 没有 await,程序直接结束。

示例1:await + task

import asyncio async def func1(): print("1") await asyncio.sleep(2) print("2") async def func2(): print("1") await asyncio.sleep(2) print("2") async def main(): tasks = [ asyncio.create_task(func1()), asyncio.create_task(func2()) ] await asyncio.gather(*tasks) if __name__ == '__main__': asyncio.run(main())

示例2:await + 协程对象

如下,代码定义了协程函数 others,其对象 others() 为一个协程对象,可以使用 await

import asyncio async def others() -> str: print("start") await asyncio.sleep(2) print("end") return '返回值' async def func() -> None: print("执行协程函数内部代码") # 当遇到 IO 操作时,挂起当前协程(任务),等待 IO 操作完成之后再继续向下执行,当前协程挂起时,事件循环还可以执行其他协程(任务) response = await others() print(f"IO 请求结束,结果为{response}") if __name__ == '__main__': asyncio.run(func())

多 await 顺序执行

有的时候,一段代码里我们需要使用多个 await,但是,第二个await的函数需要使用到第一个await中的内容

此时,我们需要第一个await执行完毕,才执行第二个await

让我们稍微修改一下上述代码:

async def func() -> None: print("执行协程函数内部代码") response1 = await others() print(f"IO 请求结束,结果为{response1}") response2 = await others() print(f"IO 请求结束,结果为{response2}")

在代码执行时,遇到第一个 await ,此时对 others() 内的操作进行等待,并不会执行下面的 respose2,只有当第一个 await 执行完毕,程序顺次执行,才能继续执行下面的 response2

多await 异步执行

那如果我们就是希望这两个 await 是异步呢,依旧是使用上述的asyncio.gather(*tasks)操作:

import asyncio async def others() -> str: print("start") await asyncio.sleep(2) print("end") return '返回值' async def func(limit: int) -> None: print("执行协程函数内部代码") tasks = [] for _ in range(limit): task: asyncio.Task = asyncio.create_task(others()) tasks.append(task) # response 将会返回一个列表:['返回值', '返回值', '返回值'] responses = await asyncio.gather(*tasks, return_exceptions=True) print(f"IO 请求结束,结果为 {responses}") if __name__ == '__main__': asyncio.run(func(limit=3))

在这段代码中,asyncio.create_task(others()) 会将协程对象包装成任务(Task)

我们创建一个任务列表tasks,使用显示的方式创建每一个task并加入tasks

: asyncio.Task 是一种类型提示(Type Hint),显式声明变量 task 的类型是 asyncio.Task,不改变实际运行行为,只增加代码可读性

task: asyncio.Task = asyncio.create_task(others()) tasks.append(task)

Task 对象

在python的官方文档中,对Task的描述为

Task 对象被用来在事件循环中运行协程。如果一个协程在等待一个 Future 对象,Task 对象会挂起该协程的执行并等待该 Future 对象完成。当该 Future 对象 完成,被打包的协程将恢复执行。
事件循环使用协同日程调度: 一个事件循环每次运行一个 Task 对象。而一个 Task 对象会等待一个 Future 对象完成,该事件循环会运行其他 Task、回调或执行 IO 操作。

即,task 用于在事件循环中添加多个任务,并并发调度协程,通过 asyncio.create_task() 创建一个 Task 对象,不需要手动实例化 Task 对象。

示例1:

注意:示例代码并非标准写法,仅为方便对比

import asyncio async def func(Flag: str) -> str: print(1) await asyncio.sleep(2) print(2) return Flag async def main(): print("main start") task1 = asyncio.create_task(func("task1")) task2 = asyncio.create_task(func("task2")) print("main end") # 当执行某协程遇到 IO ,自动化切换到其他任务 # 此处 awaite 等待对应协程执行完毕并获取结果 result1 = await task1 result2 = await task2 print(result1, result2) if __name__ == '__main__': asyncio.run(main())

结合我们在之前的示例,我们能够更清楚的理解 Task 的作用:

async def main(): print("main start") result1 = await func("func1") result2 = await func("func2") print(result1, result2)

在之前的示例中,我们使用 await + 协程对象 ,我们可以看到,代码将会执行完毕 func1 之后,再执行 func2

而在此处,我们使用 await + task ,代码将会同时执行 task1 和 task2

这是因为:

  • await + tasktask1task2 几乎同时启动。asyncio.create_task()func("task1")func("task2")放入事件循环中,允许它们并发运行。
  • await + 协程对象func("func1")首先被调用,并且在await asyncio.sleep(2)处挂起。在这两秒钟内,没有其他任务在运行,因为main()函数在等待func("func1")完成。只有当func("func1")完成后,func("func2")才开始执行。

一般的,我们会将 task 加入到 list 里,再使用 asyncio.gather(*tasks) 或者 asyncio.wait(task)

task.result( )

使用 task.result() 获取返回值

import asyncio async def func1(x) -> None: print("1") await asyncio.sleep(2) print("2") return x*x async def func2(y) -> None: print("3") await asyncio.sleep(2) print("4") return y*y async def main() -> None: task1: asyncio.Task = asyncio.create_task(func1(12)) task2: asyncio.Task = asyncio.create_task(func2(2)) tasks = [task1, task2] await asyncio.gather(*tasks) for task in tasks: print(task.result()) if __name__ == "__main__": asyncio.run(main())

并发运行协程

或许在上述示例中你会发现,并发运行协程时,我们使用了两种方式 asyncio.waitasyncio.gather,两个函数都用于将多个协程(coroutines)或期程(futures)聚集在一起,并等待它们全部完成。

asyncio.gather

asyncio.gather 的基本用法例如:

import asyncio async def task(n): await asyncio.sleep(n) return n async def main(): # 创建任务列表 tasks = [task(i) for i in range(1, 4)] # 使用 asyncio.gather 并发运行所有任务 results = await asyncio.gather(*tasks) # 打印结果 print(results) asyncio.run(main())

asyncio.gather 函数定义如下

(function) def gather( *coros_or_futures: _FutureLike[_T@gather], return_exceptions: Literal[False] = False ) -> Future[list[_T@gather]]
  • *coros_or_futuresgathe 接受一个可迭代的参数,通常是一个协程列表。使用时需要解包(*tasks)。
  • return_exceptions: 参数为 False(默认值),任何一个协程抛出异常都会导致 gather 抛出异常并停止其他协程。如果 return_exceptionsTrue,异常会被捕获并作为结果列表的一部分返回。
  • 返回值:gather 返回一个包含所有协程结果的列表。

asyncio.wait

asyncio.wait 的基本用法例如:

import asyncio async def task(n): await asyncio.sleep(n) return n async def main(): tasks = [task(i) for i in range(1, 4)] # 等待最多3秒 done, pending = await asyncio.wait(tasks, timeout=3) # timeout也可以设置None # 处理完成的任务 for done_task in done: result = await done_task print(f"Task completed with result: {result}") # 处理未完成的任务 for pending_task in pending: print(f"Task is still pending: {pending_task}") asyncio.run(main())

done 集合包含所有已经完成的协程或期程(futures)
pending 集合包含所有尚未完成的协程或期程。

asyncio.gather 函数定义如下

(function) def wait( fs: Iterable[_FT@wait], *, timeout: float | None = None, return_when: str = "ALL_COMPLETED" ) -> Coroutine[Any, Any, tuple[set[_FT@wait], set[_FT@wait]]]
  • taskswait 接受一个协程列表,不需要解包。

  • 返回值:wait 返回一个元组,包含两个集合:(done, pending)。done 是已经完成的协程的集合,pending 是尚未完成的协程的集合。要获取结果,你需要从 done 集合中的每个协程调用 .result() 方法。

  • timeoutwait 可以设置一个超时时间,如果在指定的时间内协程没有完成,wait 会返回。

  • return_when: 参数用于指定函数何时返回。它是一个字符串,有三个可能的值,每个值都定义了不同的返回条件:

    • ALL_COMPLETED(默认值):asyncio.wait 会等待所有的协程或期程(futures)完成后再返回。这意味着只有当所有传入的协程或期程都执行完毕后,wait 函数才会返回。
    • FIRST_COMPLETEDasyncio.wait 会在第一个协程或期程完成时立即返回。不管其他协程或期程是否完成,只要有一个完成了,wait 就会返回。
    • FIRST_EXCEPTIONasyncio.wait 会在第一个协程或期程抛出异常时返回,或者如果所有协程或期程都成功完成,则在所有都完成时返回。如果没有任何协程或期程抛出异常,它的行为就像 ALL_COMPLETED
  • 错误处理:wait 不会自动抛出异常。即使有协程抛出异常,wait 也会继续等待其他协程完成。异常需要通过调用协程的 .result() 方法来处理。

asyncio.Future 对象

Future 是 Task 的基类,Task 对象内部的 await 的处理是基于 Future

示例1:

代码将一直运行

async def main(): # 获取当前事件循环 loop = asyncio.get_running_loop() # 创建一个任务(Future对象),这个任务什么都不干 fut = loop.create_future() await fut

示例2:

import asyncio async def set_after(fut): await asyncio.sleep(2) fut.set_result('12') async def main(): # 获取当前事件循环 loop = asyncio.get_running_loop() # 创建一个任务(Future对象),没有绑定任何行为,任务永远不会结束 fut = loop.create_future() # 创建一个任务(Task对象),绑定 set_after ,函数内部再2s后给fut赋值 # 即手动设置了 future 任务的最终结果,此时fut结束 await loop.create_task(set_after(fut)) data = await fut print(data) if __name__ == '__main__': asyncio.run(main())

concurrent.Future 对象

concurrent.Future 对象 与 asyncio.Future 对象没有任何关系, concurrent.Future 是用于实现进程池和线程池异步操作时的对象。

import time from concurrent.futures import Future from concurrent.futures.thread import ThreadPoolExecutor from concurrent.futures.process import ProcessPoolExecutor def func(value): time.sleep(1) print(value) return 123 # 创建线程池 pool = ThreadPoolExecutor(max_workers=5) # 创建进程池 # pool = ProcessPoolExecutor(max_workers=5) for i in range(10): fut = pool.submit(func, i) print(fut)

在代码中,可能会存在 asyncio.futureconcurrent.Future 两个对象交叉使用的情况

例如,某 CRM 项目的大部分使用协程异步,但是项目的第三方模块(例如项目数据库使用MySql)不支持异步的协程,只能使用进程或者线程

import time import asyncio import concurrent.futures def func1(): # 某耗时操作 time.sleep(2) return "func1 return" async def main(): loop = asyncio.get_running_loop() # 1. run in the default lopp's executor ( 默认ThreadpoolExecutor ) # 第一步,内部先调用 ThreadpoolExecutor 的 submit 方法去线程池中申请一个线程去执行 func1 函数,并返回一个 concurrent.futures.Future 对象 # 第二步,调用 asyncio.wrap_future 将 concurrent.futures.Future 对象 包装为 asyncio.Future 对象 # 因为 concurrent.futures.Future 对象不支持 await 方法,所以需要包装成 asyncio.Future 对象才能使用 fut = loop.run_in_executor(None, func1) result = await fut print('default thread pool', result) # 2. run in a custom pool: # with concurrent.futures.ThreadPoolExecutor() as pool: # result = await loop.run_in_executor(pool, func1) # print('custom thread pool', result) # 3. run in a custom process pool: # with concurrent.futures.ProcessPoolExecutor() as pool: # result = await loop.run_in_executor(pool, func1) # print('custom process pool', result) asyncio.run(main())

案例:异步与非异步模块混合

示例代码:asyncio + requests

import requests import asyncio import time import os def creat_images_dir() -> str: """ 获取images路径, 不存在则创建 """ # 获取当前代码的根目录 root_directory = os.getcwd() # 在根目录下创建一个名为images的目录 images_directory = os.path.join(root_directory, 'images') if not os.path.exists(images_directory): print(f"已在根目录下创建images目录: {images_directory}") os.makedirs(images_directory) return images_directory print(f"目录 images 已存在:{images_directory}") return images_directory async def download_image(url: str): """ 从url下载图片并保存到images下 """ print(f"[+] 开始下载图片",end=" -> ") loop = asyncio.get_event_loop() # requests 模块不支持异步操作,所以使用线程池配合实现 future = loop.run_in_executor(None, requests.get, url) response = await future print(f"[+] 下载完成: {url}") # 从URL中提取文件名,保存到本地目录 filename = url.split('/')[-1] file_path = os.path.join(images_dir, filename) with open(file_path, mode='wb') as file_object: file_object.write(response.content) if __name__ == '__main__': url_list = [ "https://img.timelessq.com/images/2022/07/26/2f015bf5c14d8b24eba440ef80422b8d.jpg", "https://img.timelessq.com/images/2022/07/26/662d37b8a580b466da309ec795b7ae55.jpg", "https://img.timelessq.com/images/2022/07/26/f53974c854a887e80e9f72685a99854c.jpg", "https://img.timelessq.com/images/2022/07/26/a956521148df0241038d6f6838a9806f.jpg", "https://img.timelessq.com/images/2022/07/26/1406d3cdecfa0dd6e62f13a601eb9230.jpg", "https://img.timelessq.com/images/2022/07/26/d757e4c9c546cd7dba16ee5e23dc2d9d.jpg" ] global images_dir images_dir = creat_images_dir() start_time = time.time() # 创建新的事件循环 new_loop = asyncio.new_event_loop() asyncio.set_event_loop(new_loop) # 运行主函数 tasks = [asyncio.ensure_future(download_image(url)) for url in url_list] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) exec_time = time.time() - start_time print(f"[+] 下载完毕, 耗时{exec_time}")

回调函数

有时候,我们在下一步程序开始前,我们需要使用到异步函数中的结果,这是就需要回调,这里提供两种回调方式

  1. task.resualt()

直接通过 task.resualt() 输出 task 的返回值,如下示例

import asyncio import random # 定义协程对象 async def set_number(): # 生成一个0到100之间的随机整数 random_number = random.randint(0, 100) return format(random_number) async def main(): task_set: asyncio.Task = asyncio.create_task(set_number()) await task_set # 获取返回值 number = task_set.result() print(f'random number :{number}') if __name__ == '__main__': asyncio.run(main())
  1. add_done_callback()

asyncio 自带有 add_done_callback() ,允许task绑定回调函数,适用于在异步结束后仍需对函数进行一些操作的情况

使用方法:

  • 定义回调函数,函数接收future对象,通过 future.result() 方法获取异步函数返回值
  • task.add_done_callback() 绑定回调函数
import asyncio import random # 1.定义回调函数,接收 future,使用future.result() 方法获取异步函数返回值 def callback(future): # 模拟对异步函数返回值的进一步操作 number_info = future.result() - 100 print(number_info) return number_info # 定义协程对象 async def set_number(): # 生成一个0到100之间的随机整数 random_number = random.randint(0, 100) return random_number async def main(): task_set: asyncio.Task = asyncio.create_task(set_number()) await task_set # 2.绑定回调函数,task结束后自动调用callback task_set.add_done_callback(callback) if __name__ == '__main__': asyncio.run(main())