为了提升性能,例如 tronado、fastapi、django 3.x asgi、aiohttp 等,越来越多的框架都在想异步方向发展。
本篇将讲解协程基础和 asyncio 相关内容,python 3.11 官方文档:协程与任务

# 协程

协程 ( coroutine ) 也可以被称为微线程,是一种用户态的上下文切换技术,简而言之,就是通过一个线程实现代码块相互切换执行。

与计算机本就提供的线程和进程不同,协程并非计算机提供。而是程序员人为创造的。

举个例子,对于如下代码,根据代码执行顺序,会先执行 func1,然后执行 func2,那如果我们希望在执行 func1 的过程中,切换到 func2,执行部分代码再切回 func1,该怎么做呢,这就需要用到协程。

def func1 ():
    print (1)
    ...
    print (2)
def func2 ():
    print (3)
    ...
    print (4)
func1 ()
func2 ()

# 实现协程的方法

其实,实现协程有很多方法,例如

  • 使用第三方模块:greenlet(早期模块)
  • yield 关键字
  • asyncio 装饰器(python3.4 引入)
  • async、await 关键字(python3.5 引入)

目前最主流的是使用 async、await 关键字。

下面,对实现协程的方法逐一讲解

# greenlet

由于 greelnet 是一个第三方模块,在使用之前,需要安装该模块

pip install greenlet

在使用时,我们先将函数封装到 greenlet 里,生成 gr1 对象,之后,我们可以在函数中使用 switch () 进行跳转

from greenlet import greenlet
def func1 ():
    print (1)    # 第 2 步:输出 1
    gr2.switch ()    # 第 3 步:切换到 func2
    print (2)    # 第 6 步:输出 2
    gr2.switch ()    # 第 7 步: 切换到 func2,从上一次执行的位置继续向后执行
def func2 ():
    print (3)    # 第 4 步: 输出 3
    gr1.switch ()    # 第 5 步: 切换到 func1,从上一次执行的位置继续向后执行
    print (4)    # 第 8 步:输出 4
gr1 = greenlet (func1)
gr2 = greenlet (func2)
gr1.switch ()    # 第 1 步:执行 func1

# yield 关键字

** 这种方式较为牵强,一般不会使用 **

yield 是 python 生成器关键字

当在生成器函数中使用 yield 语句时,函数的执行将会暂停,并将 yield 后面的表达式作为当前迭代的值返回。

然后,每次调用生成器的 next () 方法或使用 for 循环进行迭代时,函数会从上次暂停的地方继续执行,直到再次遇到 yield 语句。这样,生成器函数可以逐步产生值,而不需要一次性计算并返回所有结果。

def func1 ():
    yield 1
    yield from func2 ()
    yield 2
def func2 ():
    yield 3
    yield 4
f1 = func1 ()
for item in f1:
    print (item)

# asyncio

在 ptrhon 3.4 及以后的版本引入了 asyncio (python 3.4 - 3.7 )

@asyncio.coroutine 在 Python 3.8 + 已被标记为过时(deprecated)

在 Python 3.11 + 中该装饰器已被移除

import asyncio
@asyncio.coroutine
def func1 ():
    print (1)
    yield from asyncio.sleep (2) # 遇到 IO 耗时操作,自动切换到 tasks 中的其他任务
    print (2)
@asyncio.coroutine
def func2 ():
    print (3)
    yield from asyncio.sleep (2) # 遇到 IO 耗时操作,自动切换到 tasks 中的其他任务
    print (4)
tasks = [
    asyncio.ensure_future (func1 ()),
    asyncio.ensure_future (func2 ())
]
loop = asyncio.get_event_loop ()
loop.run_until_complete (asyncio.wait (tasks))

# async & await 关键字

async & await 语法的本质与上述 asyncio 一致,只是写法更加简洁,故而现在写法一般用 async & await (python ≥ 3.5)

在定义函数时使用 async def,使用 await 处理 IO 操作

import asyncio
async def func1 ():
    print (1)
    await asyncio.sleep (2) # 遇到 IO 耗时操作,自动切换到 tasks 中的其他任务
    print (2)
async def func2 ():
    print (3)
    await asyncio.sleep (2) # 遇到 IO 耗时操作,自动切换到 tasks 中的其他任务
    print (4)
tasks = [
    asyncio.ensure_future (func1 ()),
    asyncio.ensure_future (func2 ())
]
loop = asyncio.get_event_loop ()
loop.run_until_complete (asyncio.wait (tasks))

# 协程的意义

在一个线程中,如果遇到 IO 阻塞,线程会利用空闲时间去处理其余的事件。

在得到时间片时,如果遇到 IO ,使用时间片处理别的事件。

在上述代码中,我们使用 sleep () 来模拟 IO 操作,可能这种示例并不清晰,实际上, 协程在 并发、爬虫、网络通信、任务队列、流处理 等场景均有广泛应用。

下面,我们举一个更加实际的例子,从网上下载六张图片,保存到代码所在根目录下的 images 目录

import requests
import time
import os
def creat_images_dir () -> str:
    """
    获取 images 路径,不存在则创建
    """
    # 获取当前代码的根目录
    root_directory = os.getcwd ()
    # 在根目录下创建一个名为 images 的目录
    images_directory = os.path.join (root_directory, 'images')
    if not os.path.exists (images_directory):
        print (f"已在根目录下创建 images 目录: {images_directory}")
        os.makedirs (images_directory)
        return images_directory
    print (f"目录 images 已存在:{images_directory}")
    return images_directory
def download_image (url: str):
    """
    从 url 下载图片并保存到 images 下
    """
    print (f"[+] 开始下载图片",end="->")
    # 发送网络请求
    responce = requests.get (url)
    print (f"[+] 下载完成: {url}")
    # 从 URL 中提取文件名,保存到本地目录
    filename = url.split ('/')[-1]
    file_path = os.path.join (images_dir, filename)
    with open (file_path, mode='wb') as file_object:
        file_object.write (responce.content)
if __name__ == '__main__':
    url_list = [
        "https://img.timelessq.com/images/2022/07/26/2f015bf5c14d8b24eba440ef80422b8d.jpg",
        "https://img.timelessq.com/images/2022/07/26/662d37b8a580b466da309ec795b7ae55.jpg",
        "https://img.timelessq.com/images/2022/07/26/f53974c854a887e80e9f72685a99854c.jpg",
        "https://img.timelessq.com/images/2022/07/26/a956521148df0241038d6f6838a9806f.jpg",
        "https://img.timelessq.com/images/2022/07/26/1406d3cdecfa0dd6e62f13a601eb9230.jpg",
        "https://img.timelessq.com/images/2022/07/26/d757e4c9c546cd7dba16ee5e23dc2d9d.jpg"
    ]
    global images_dir 
    images_dir = creat_images_dir ()
    start_time = time.time ()
    for item in url_list:
        download_image (item)
    exec_time = time.time () - start_time
    print (f"[+] 下载完毕,耗时 {exec_time}")

在上述代码中,函数依次下载并保存每一张图片,只有当前一张图片已经下载完毕,才开始下一张图片,当前环境运行耗时约为 6 秒

现在,我们稍微修改代码,为了实现协程,我们需要使用 asyncio 库来运行异步任务。同时,由于 requests 库不支持异步操作,我们需要使用 aiohttp 库来替代它进行异步 HTTP 请求。

import aiohttp
import asyncio
import time
import os
def creat_images_dir () -> str:
    """
    获取 images 路径,不存在则创建
    """
    # 获取当前代码的根目录
    root_directory = os.getcwd ()
    # 在根目录下创建一个名为 images 的目录
    images_directory = os.path.join (root_directory, 'images')
    if not os.path.exists (images_directory):
        print (f"已在根目录下创建 images 目录: {images_directory}")
        os.makedirs (images_directory)
        return images_directory
    print (f"目录 images 已存在:{images_directory}")
    return images_directory
async def download_image (url: str, session):
    """
    从 url 下载图片并保存到 images 下
    """
    print (f"[+] 开始下载图片",end="->")
    # 发送网络请求
    async with session.get (url) as response:
        if response.status != 200:
            print (f"[-] {response.status} 下载失败: {url}")
            return
        print (f"[+] 下载完成: {url}")
        # 从 URL 中提取文件名,保存到本地目录
        filename = url.split ('/')[-1]
        file_path = os.path.join (images_dir, filename)
        with open (file_path, mode='wb') as file_object:
            file_object.write (await response.read ())
async def main (url_list: list):
    # 创建 aiohttp 客户端会话
    async with aiohttp.ClientSession () as session:
        # 创建任务列表
        tasks = [download_image (url, session) for url in url_list]
        await asyncio.gather (*tasks)
if __name__ == '__main__':
    url_list = [
        "https://img.timelessq.com/images/2022/07/26/2f015bf5c14d8b24eba440ef80422b8d.jpg",
        "https://img.timelessq.com/images/2022/07/26/662d37b8a580b466da309ec795b7ae55.jpg",
        "https://img.timelessq.com/images/2022/07/26/f53974c854a887e80e9f72685a99854c.jpg",
        "https://img.timelessq.com/images/2022/07/26/a956521148df0241038d6f6838a9806f.jpg",
        "https://img.timelessq.com/images/2022/07/26/1406d3cdecfa0dd6e62f13a601eb9230.jpg",
        "https://img.timelessq.com/images/2022/07/26/d757e4c9c546cd7dba16ee5e23dc2d9d.jpg"
    ]
    global images_dir 
    images_dir = creat_images_dir ()
    start_time = time.time ()
    # 运行主函数
    asyncio.run (main (url_list))
    exec_time = time.time () - start_time
    print (f"[+] 下载完毕,耗时 {exec_time}")

现在,代码中的下载操作将被异步执行,当前环境下运行耗时约为 2 秒

# 异步编程

# 事件循环

我们可以将事件循环理解成为一个 wile True,不断检查并执行 tasks 池中的任务。

下面,使用伪代码解释事件循环

# 伪代码
任务列表 = [ 任务 1, 任务 2, 任务 3, ... ]
wile True:
    可执行的任务列表,已完成的任务列表 = 去任务列表中检查所有的任务,将 ' 可执行 '' 已完成 ' 的任务返回
    for 就绪任务 in 可执行的任务列表:
        执行已就绪的任务
    
    for 已完成的任务 in 已完成的任务列表:
        从任务列表中移除已完成的任务
    
    if 任务列表 为空,终止循环

让我们再回顾一下 async 的早期版本中,如何实现异步:

import asyncio
# 生成一个事件循环
loop = asyncio.get_event_loop ()
# 将任务放入 ' 任务列表 '
loop.run_until_complete (任务列表)

# 代码模板

这里提供两种代码模板

  1. 使用 loop ,实现一个简易的 async 函数格式
    1. 定义 / 创建协程对象
    1. 定义事件循环对象容器
    1. 将协程转换为 task 任务
    1. 将 task 任务放入事件循环对象中触发
import asyncio
# 定义协程对象
async def func (name):
    print ("hello", name)
# 1. 创建协程对象
coro = func ('world')
# 2. 获取事件循环对象容器
loop = asyncio.get_event_loop ()
# 3. 将协程对象转换为 task
task = asyncio.ensure_future (coro)
# 4. 将 task 任务放入事件循环对象中触发
loop.run_until_complete (task)

上述示例中,第三步可以用 task = loop.create_task (coro)task = asyncio.ensure_future (coro) ,区别为,后者可处理更多对象,更加灵活

如果要接收任务列表,则第四步中需要使用 asyncio.wait 处理任务列表,即 loop.run_until_complete (asyncio.wait (tasks))

  1. 直接使用 run 方法,实现一个简易的 async 函数格式,形如
    1. 定义 / 创建协程对象
    1. 将协程对象转换为 task 任务
    1. 在主函数中,使用 await task 等待 task 完成
    1. 使用 asyncio.run 方法运行主函数
import asyncio
async def func1 () -> None:
    print ("1")
    await asyncio.sleep (2)
    print ("2")
async def func2 () -> None:
    print ("3")
    await asyncio.sleep (2)
    print ("4")
async def main () -> None:
    task1: asyncio.Task = asyncio.create_task (func1 ())
    task2: asyncio.Task = asyncio.create_task (func2 ())
    tasks = [task1, task2]
    await asyncio.gather (*tasks)
if __name__ == "__main__":
    asyncio.run (main ())

下面对异步函数的写法进行逐步拆解,但是我们得先了解两个概念:

  • 协程函数:在定义函数时,在 def 前具备 async 关键字 或 使用装饰器

  • 协程对象:执行写成函数得到的对象

# 协程函数
async def func ():
    pass
result = func ()

定义协程对象之后,我们需要让协程对象进入事件循环,才能执行

import asyncio
async def func ():
    print ("hello")
result = func ()
loop = asyncio.get_event_loop ()
loop.run_until_complete ( result )

在 python 3.7 之后,我们有了更便捷的写法,在逻辑不变的形况下,简化了代码

import asyncio
async def func ():
    print ("hello")
result = func ()
asyncio.run (result)

asyncio.run () 需要一个可调用的协程对象,如果想要多个协程函数并发执行,我们需要用到 asyncio.gather () 可以接受多个协程,在被调用时同时启动。

asyncio.gather ()

import asyncio
async def func1 ():
    print ("1")
    await asyncio.sleep (2)
    print ("2")
async def func2 ():
    print ("3")
    await asyncio.sleep (2)
    print ("4")
async def main ():
    await asyncio.gather (func1 (), func2 ())
asyncio.run (main ())

当然,我们也可以将所有的协程函数装入列表,然后传递给 asyncio.gather () ,但是在传递时,需要使用 * 解包列表

注意:由于 asyncio.gather () 需要接收多个协程作为单独的参数,而不是一个列表。
所以将列表 tasks 传递给 asyncio.gather () 时,一定要使用 * 操作符将列表展开为多个参数,即 asyncio.gather (*tasks)

我们只需要修改 main

async def main ():
    tasks = [func1 (), func2 ()]
    await asyncio.gather (*tasks)    # 使用 * 解包列表

虽然直接传递协程对象给 gather () 可以工作,但更规范的写法应该显式创建任务:

async def main ():
    tasks = [asyncio.create_task (func1 ()), asyncio.create_task (func2 ())]
    await asyncio.gather (*tasks)

# await 关键字

await + 可等待对象(协程对象、Future、Task 对象、网络 IO 等)

在异步函数执行中,当遇到 awaite 时,转而执行 asyncio.gather (*tasks) 中其余的异步函数,当某一次 loop 检测到当前异步的 IO 操作执行完毕,再回来继续执行当前函数

await 让主函数需要等待异步完成,如果没有 await,在异步函数 IO 时,主函数直接结束

举个例子

import asyncio
async def func (Flag: str) -> str:
    print (1)
    await asyncio.sleep (2)
    print (2)
    return Flag
async def main ():
    print ("main start")
    task1 = asyncio.create_task (func ("task1"))
    task2 = asyncio.create_task (func ("task2"))
    
    # result1 = await task1
    # result2 = await task2 
    # print (result1, result2)
    print ("main end")
if __name__ == '__main__':
    asyncio.run (main ())

这里,我们注释掉 await,此时,代码输出为:

main start
main end
1
1

显然,协程正在执行 func,此时两个 func 均遇到了 IO ,只能等待,但是由于 main 没有 await,程序直接结束。

# 示例 1:await + task

import asyncio
async def func1 ():
    print ("1")
    await asyncio.sleep (2)
    print ("2")
async def func2 ():
    print ("1")
    await asyncio.sleep (2)
    print ("2")
async def main ():
    tasks  = [
        asyncio.create_task (func1 ()),
        asyncio.create_task (func2 ())
        ]
    await asyncio.gather (*tasks)
if __name__ == '__main__':
    asyncio.run (main ())

# 示例 2:await + 协程对象

如下,代码定义了协程函数 others ,其对象 others () 为一个协程对象,可以使用 await

import asyncio
async def others () -> str:
    print ("start")
    await asyncio.sleep (2)
    print ("end")
    return ' 返回值 '
async def func () -> None:
    print ("执行协程函数内部代码")
    # 当遇到 IO 操作时,挂起当前协程(任务),等待 IO 操作完成之后再继续向下执行,当前协程挂起时,事件循环还可以执行其他协程(任务)
    response = await others ()
    print (f"IO 请求结束,结果为 {response}")
if __name__ == '__main__':
    asyncio.run (func ())

# 多 await 顺序执行

有的时候,一段代码里我们需要使用多个 await,但是,第二个 await 的函数需要使用到第一个 await 中的内容

此时,我们需要第一个 await 执行完毕,才执行第二个 await

让我们稍微修改一下上述代码:

async def func () -> None:
    print ("执行协程函数内部代码")
    response1 = await others ()
    print (f"IO 请求结束,结果为 {response1}")
    response2 = await others ()
    print (f"IO 请求结束,结果为 {response2}")

在代码执行时,遇到第一个 await ,此时对 others () 内的操作进行等待,并不会执行下面的 respose2 ,只有当第一个 await 执行完毕,程序顺次执行,才能继续执行下面的 response2

# 多 await 异步执行

那如果我们就是希望这两个 await 是异步呢,依旧是使用上述的 asyncio.gather (*tasks) 操作:

import asyncio
async def others () -> str:
    print ("start")
    await asyncio.sleep (2)
    print ("end")
    return ' 返回值 '
async def func (limit: int) -> None: 
    print ("执行协程函数内部代码")
    tasks = []
    for _ in range (limit):
        task: asyncio.Task = asyncio.create_task (others ())
        tasks.append (task)
    # response 将会返回一个列表:[' 返回值 ', ' 返回值 ', ' 返回值 ']
    responses = await asyncio.gather (*tasks, return_exceptions=True)
    print (f"IO 请求结束,结果为 {responses}")
if __name__ == '__main__':
    asyncio.run (func (limit=3))

在这段代码中, asyncio.create_task (others ()) 会将协程对象包装成任务(Task)

我们创建一个任务列表 tasks,使用显示的方式创建每一个 task 并加入 tasks

: asyncio.Task 是一种类型提示(Type Hint),显式声明变量 task 的类型是 asyncio.Task,不改变实际运行行为,只增加代码可读性

task: asyncio.Task = asyncio.create_task (others ())
tasks.append (task)

# Task 对象

在 python 的官方文档中,对 Task 的描述为

Task 对象被用来在事件循环中运行协程。如果一个协程在等待一个 Future 对象,Task 对象会挂起该协程的执行并等待该 Future 对象完成。当该 Future 对象 完成,被打包的协程将恢复执行。
事件循环使用协同日程调度:一个事件循环每次运行一个 Task 对象。而一个 Task 对象会等待一个 Future 对象完成,该事件循环会运行其他 Task、回调或执行 IO 操作。

即,task 用于在事件循环中添加多个任务,并并发调度协程,通过 asyncio.create_task () 创建一个 Task 对象,不需要手动实例化 Task 对象。

示例 1:

注意:示例代码并非标准写法,仅为方便对比

import asyncio
async def func (Flag: str) -> str:
    print (1)
    await asyncio.sleep (2)
    print (2)
    return Flag
async def main ():
    print ("main start")
    task1 = asyncio.create_task (func ("task1"))
    task2 = asyncio.create_task (func ("task2"))
    print ("main end")
    # 当执行某协程遇到 IO ,自动化切换到其他任务
    # 此处 awaite 等待对应协程执行完毕并获取结果
    result1 = await task1
    result2 = await task2 
    print (result1, result2)
if __name__ == '__main__':
    asyncio.run (main ())

结合我们在之前的示例,我们能够更清楚的理解 Task 的作用:

async def main ():
    print ("main start")
    result1 = await func ("func1")
    result2 = await func ("func2")
    print (result1, result2)

在之前的示例中,我们使用 await + 协程对象 ,我们可以看到,代码将会执行完毕 func1 之后,再执行 func2

而在此处,我们使用 await + task ,代码将会同时执行 task1 和 task2

这是因为:

  • await + tasktask1task2 几乎同时启动。 asyncio.create_task ()func ("task1")func ("task2") 放入事件循环中,允许它们并发运行。
  • await + 协程对象func ("func1") 首先被调用,并且在 await asyncio.sleep (2) 处挂起。在这两秒钟内,没有其他任务在运行,因为 main () 函数在等待 func ("func1") 完成。只有当 func ("func1") 完成后, func ("func2") 才开始执行。

一般的,我们会将 task 加入到 list 里,再使用 asyncio.gather (*tasks) 或者 asyncio.wait (task)

# task.result ( )

使用 task.result () 获取返回值

import asyncio
async def func1 (x) -> None:
    print ("1")
    await asyncio.sleep (2)
    print ("2")
    return x*x
async def func2 (y) -> None:
    print ("3")
    await asyncio.sleep (2)
    print ("4")
    return y*y
async def main () -> None:
    task1: asyncio.Task = asyncio.create_task (func1 (12))
    task2: asyncio.Task = asyncio.create_task (func2 (2))
    tasks = [task1, task2]
    await asyncio.gather (*tasks)
    for task in tasks:
        print (task.result ())
if __name__ == "__main__":
    asyncio.run (main ())

# 并发运行协程

或许在上述示例中你会发现,并发运行协程时,我们使用了两种方式
asyncio.waitasyncio.gather ,两个函数都用于将多个协程(coroutines)或期程(futures)聚集在一起,并等待它们全部完成。

# asyncio.gather

asyncio.gather 的基本用法例如:

import asyncio
async def task (n):
    await asyncio.sleep (n)
    return n
async def main ():
    # 创建任务列表
    tasks = [task (i) for i in range (1, 4)]
    
    # 使用 asyncio.gather 并发运行所有任务
    results = await asyncio.gather (*tasks)
    
    # 打印结果
    print (results)
asyncio.run (main ())

asyncio.gather 函数定义如下

(function) def gather (
    *coros_or_futures: _FutureLike [_T@gather],
    return_exceptions: Literal [False] = False
) -> Future [list [_T@gather]]
  • *coros_or_futuresgathe 接受一个可迭代的参数,通常是一个协程列表。使用时需要解包(*tasks)。
  • return_exceptions : 参数为 False(默认值),任何一个协程抛出异常都会导致 gather 抛出异常并停止其他协程。如果 return_exceptionsTrue ,异常会被捕获并作为结果列表的一部分返回。
  • 返回值: gather 返回一个包含所有协程结果的列表。

# asyncio.wait

asyncio.wait 的基本用法例如:

import asyncio
async def task (n):
    await asyncio.sleep (n)
    return n
async def main ():
    tasks = [task (i) for i in range (1, 4)]
    # 等待最多 3 秒
    done, pending = await asyncio.wait (tasks, timeout=3)    # timeout 也可以设置 None
    # 处理完成的任务
    for done_task in done:
        result = await done_task
        print (f"Task completed with result: {result}")
    # 处理未完成的任务
    for pending_task in pending:
        print (f"Task is still pending: {pending_task}")
asyncio.run (main ())

done 集合包含所有已经完成的协程或期程(futures)
pending 集合包含所有尚未完成的协程或期程。

asyncio.gather 函数定义如下

(function) def wait (
    fs: Iterable [_FT@wait],
    *,
    timeout: float | None = None,
    return_when: str = "ALL_COMPLETED"
) -> Coroutine [Any, Any, tuple [set [_FT@wait], set [_FT@wait]]]
  • taskswait 接受一个协程列表,不需要解包。

  • 返回值:wait 返回一个元组,包含两个集合:(done, pending)。done 是已经完成的协程的集合,pending 是尚未完成的协程的集合。要获取结果,你需要从 done 集合中的每个协程调用 .result () 方法。

  • timeoutwait 可以设置一个超时时间,如果在指定的时间内协程没有完成, wait 会返回。

  • return_when : 参数用于指定函数何时返回。它是一个字符串,有三个可能的值,每个值都定义了不同的返回条件:

    • ALL_COMPLETED (默认值): asyncio.wait 会等待所有的协程或期程(futures)完成后再返回。这意味着只有当所有传入的协程或期程都执行完毕后,wait 函数才会返回。
    • FIRST_COMPLETEDasyncio.wait 会在第一个协程或期程完成时立即返回。不管其他协程或期程是否完成,只要有一个完成了, wait 就会返回。
    • FIRST_EXCEPTIONasyncio.wait 会在第一个协程或期程抛出异常时返回,或者如果所有协程或期程都成功完成,则在所有都完成时返回。如果没有任何协程或期程抛出异常,它的行为就像 ALL_COMPLETED
  • 错误处理: wait 不会自动抛出异常。即使有协程抛出异常, wait 也会继续等待其他协程完成。异常需要通过调用协程的 .result () 方法来处理。

# asyncio.Future 对象

Future 是 Task 的基类,Task 对象内部的 await 的处理是基于 Future

示例 1:

代码将一直运行

async def main ():
    # 获取当前事件循环
    loop = asyncio.get_running_loop ()
    # 创建一个任务(Future 对象),这个任务什么都不干
    fut = loop.create_future ()
    await fut

示例 2:

import asyncio
async def set_after (fut):
    await asyncio.sleep (2)
    fut.set_result ('12')
async def main ():
    # 获取当前事件循环
    loop = asyncio.get_running_loop ()
    # 创建一个任务(Future 对象),没有绑定任何行为,任务永远不会结束
    fut = loop.create_future ()
    # 创建一个任务(Task 对象),绑定 set_after ,函数内部再 2s 后给 fut 赋值
    # 即手动设置了 future 任务的最终结果,此时 fut 结束
    await loop.create_task (set_after (fut))
    data = await fut
    print (data)
if __name__ == '__main__':
    asyncio.run (main ())

# concurrent.Future 对象

concurrent.Future 对象 与 asyncio.Future 对象没有任何关系,
concurrent.Future 是用于实现进程池和线程池异步操作时的对象。

import time
from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor
def func (value):
    time.sleep (1)
    print (value)
    return 123
# 创建线程池
pool = ThreadPoolExecutor (max_workers=5)
# 创建进程池
# pool = ProcessPoolExecutor (max_workers=5)
for i in range (10):
    fut = pool.submit (func, i)
    print (fut)

在代码中,可能会存在 asyncio.futureconcurrent.Future 两个对象交叉使用的情况

例如,某 CRM 项目的大部分使用协程异步,但是项目的第三方模块(例如项目数据库使用 MySql)不支持异步的协程,只能使用进程或者线程

import time
import asyncio
import concurrent.futures
def func1 ():
    # 某耗时操作
    time.sleep (2)
    return "func1 return"
async def main ():
    loop = asyncio.get_running_loop ()
    # 1. run in the default lopp's executor (默认 ThreadpoolExecutor)
    # 第一步,内部先调用 ThreadpoolExecutor 的 submit 方法去线程池中申请一个线程去执行 func1 函数,并返回一个 concurrent.futures.Future 对象 
    # 第二步,调用 asyncio.wrap_future 将 concurrent.futures.Future 对象 包装为 asyncio.Future 对象
    # 因为 concurrent.futures.Future 对象不支持 await 方法,所以需要包装成 asyncio.Future 对象才能使用
    fut = loop.run_in_executor (None, func1)
    result = await fut
    print ('default thread pool', result)
    # 2. run in a custom pool:
    # with concurrent.futures.ThreadPoolExecutor () as pool:
    #     result = await loop.run_in_executor (pool, func1)
    #     print ('custom thread pool', result)
    
    # 3. run in a custom process pool:
    # with concurrent.futures.ProcessPoolExecutor () as pool:
    #     result = await loop.run_in_executor (pool, func1)
    #     print ('custom process pool', result)
    
asyncio.run (main ())

案例:异步与非异步模块混合

示例代码:asyncio + requests

import requests
import asyncio
import time
import os
def creat_images_dir () -> str:
    """
    获取 images 路径,不存在则创建
    """
    # 获取当前代码的根目录
    root_directory = os.getcwd ()
    # 在根目录下创建一个名为 images 的目录
    images_directory = os.path.join (root_directory, 'images')
    if not os.path.exists (images_directory):
        print (f"已在根目录下创建 images 目录: {images_directory}")
        os.makedirs (images_directory)
        return images_directory
    print (f"目录 images 已存在:{images_directory}")
    return images_directory
async def download_image (url: str):
    """
    从 url 下载图片并保存到 images 下
    """
    print (f"[+] 开始下载图片",end="->")
    loop = asyncio.get_event_loop ()
    # requests 模块不支持异步操作,所以使用线程池配合实现
    future = loop.run_in_executor (None, requests.get, url)
    response = await future
    print (f"[+] 下载完成: {url}")
    # 从 URL 中提取文件名,保存到本地目录
    filename = url.split ('/')[-1]
    file_path = os.path.join (images_dir, filename)
    with open (file_path, mode='wb') as file_object:
        file_object.write (response.content)
if __name__ == '__main__':
    url_list = [
        "https://img.timelessq.com/images/2022/07/26/2f015bf5c14d8b24eba440ef80422b8d.jpg",
        "https://img.timelessq.com/images/2022/07/26/662d37b8a580b466da309ec795b7ae55.jpg",
        "https://img.timelessq.com/images/2022/07/26/f53974c854a887e80e9f72685a99854c.jpg",
        "https://img.timelessq.com/images/2022/07/26/a956521148df0241038d6f6838a9806f.jpg",
        "https://img.timelessq.com/images/2022/07/26/1406d3cdecfa0dd6e62f13a601eb9230.jpg",
        "https://img.timelessq.com/images/2022/07/26/d757e4c9c546cd7dba16ee5e23dc2d9d.jpg"
    ]
    global images_dir 
    images_dir = creat_images_dir ()
    start_time = time.time ()
    
    # 创建新的事件循环
    new_loop = asyncio.new_event_loop ()
    asyncio.set_event_loop (new_loop)
    
    # 运行主函数
    tasks = [asyncio.ensure_future (download_image (url)) for url in url_list]
    loop = asyncio.get_event_loop ()
    loop.run_until_complete (asyncio.wait (tasks))
    exec_time = time.time () - start_time
    print (f"[+] 下载完毕,耗时 {exec_time}")

# 回调函数

有时候,我们在下一步程序开始前,我们需要使用到异步函数中的结果,这是就需要回调,这里提供两种回调方式

  1. task.resualt ()

直接通过 task.resualt () 输出 task 的返回值,如下示例

import asyncio
import random
# 定义协程对象
async def set_number ():
    # 生成一个 0 到 100 之间的随机整数
    random_number = random.randint (0, 100)
    return format (random_number)
  
async def main ():
    task_set: asyncio.Task = asyncio.create_task (set_number ())
    await task_set
    # 获取返回值
    number = task_set.result ()
    print (f'random number :{number}')
if __name__ == '__main__':
    asyncio.run (main ())
  1. add_done_callback ()

asyncio 自带有 add_done_callback () ,允许 task 绑定回调函数,适用于在异步结束后仍需对函数进行一些操作的情况

使用方法:

  • 定义回调函数,函数接收 future 对象,通过 future.result () 方法获取异步函数返回值
  • task.add_done_callback () 绑定回调函数
import asyncio
import random
# 1. 定义回调函数,接收 future,使用 future.result () 方法获取异步函数返回值
def callback (future):
    # 模拟对异步函数返回值的进一步操作
    number_info = future.result () - 100
    print (number_info)
    return number_info
# 定义协程对象
async def set_number ():
    # 生成一个 0 到 100 之间的随机整数
    random_number = random.randint (0, 100)
    return random_number
  
async def main ():
    task_set: asyncio.Task = asyncio.create_task (set_number ())
    await task_set
    # 2. 绑定回调函数,task 结束后自动调用 callback
    task_set.add_done_callback (callback)
if __name__ == '__main__':
    asyncio.run (main ())