ImQi1云笔记ImQi1云笔记
首页
  • 鸿蒙开发
  • HTML
  • CSS
  • JavaScript
  • TypeScript
  • Vue.js
  • Java速查
  • Python 速查
  • Python 异步编程
  • Django框架
  • FastAPI
  • Flask框架
  • PyQt6
  • Tornado框架
  • Linux基本命令
  • Linux Shell
  • Git命令速查
  • Docker 学习笔记
  • Nginx 学习笔记
  • MySQL
  • MongoDB
  • Redis
  • XPath
首页
  • 鸿蒙开发
  • HTML
  • CSS
  • JavaScript
  • TypeScript
  • Vue.js
  • Java速查
  • Python 速查
  • Python 异步编程
  • Django框架
  • FastAPI
  • Flask框架
  • PyQt6
  • Tornado框架
  • Linux基本命令
  • Linux Shell
  • Git命令速查
  • Docker 学习笔记
  • Nginx 学习笔记
  • MySQL
  • MongoDB
  • Redis
  • XPath
  • Python异步编程

    • 介绍
    • 前置知识
      • 生成器
      • 异步IO
    • 异步编程相关概念
      • asyncio 库
      • 事件循环
    • 快速上手
    • 正式学习
      • 协程对象
      • await 关键字
      • 协程嵌套
      • task 对象
      • 协程并发
      • future 对象
      • concurrent 中的 future 对象
      • 协程与线程和进程的交叉使用
      • 异步与非异步模块混合使用案例
      • 异步迭代器
      • 绑定回调
      • 阻塞与 await
      • 并发执行与同步执行
      • 异步上下文管理器
    • 实际使用
      • 异步 HTTP 请求
      • 异步MySQL
    • 补充
      • asyncio 模块的常用方法和类

Python 异步编程

介绍

在同一线程内,一段执行代码过程中,可以中断并跳转到另一段代码中,接着之前中断的地方继续执行。

协程运行状态于多线程类似。

协程优点:

  • 无需线程上下文切换,协程避免了无意义的调度,可以提高性能。
  • 无需原子操作锁定及同步开销。
  • 方便切换控制流,简化编程模型
  • 高并发 + 高扩展性 + 低成本,一个CPU支持上万的协程不是问题,很适合用于高并发处理。

协程缺点:

  • 无法利用多核资源。协程的本质是单线程,不能同时将单个CPU的多个核用上,协程需要进程配合才能运行在多CPU上。(不过我们日常编程不会有这个问题,除非是CPU密集型应用)
  • 进行阻塞操作(如IO时)会阻塞掉整个程序

前置知识

生成器

例:

def fun():
    print('---------start---------')
    while True:
        yield '生成器返回的数据....'


g = fun()
print(next(g))
print('*' * 20)
print(next(g))

运行结果:

---------start---------
生成器返回的数据....
********************
生成器返回的数据....

由以上代码得知生成器在第二个运行的时候会根据上一次运行的状态继续向下执行。利用这一个特性我们可以使用生成器进行任务之间的切换。

使用yield实现协程

import time


def func_a():
    while True:
        print('-------A-------')
        yield
        time.sleep(0.5)


def func_b(obj):
    while True:
        print('---------B------------')
        obj.__next__()


a = func_a()
func_b(a)

打印结果:

---------B------------
-------A-------
---------B------------
-------A-------
---------B------------
-------A-------
---------B------------

当前程序不断交替执行,完成了程序之间的切换。

异步IO

  • 同步:先执行第一个事务,如果遇到阻塞,则进行等待直到第一个事务执行完毕,再执行第二个事务。
import time


def foo():
    time.sleep(1)


now = lambda: time.time()
start = now()

for i in range(5):
    foo()

print('同步所花费的时间: %f s' % (now() - start))

打印结果:

同步所花费的时间: 5.004482 s
  • 异步:执行第一个事务之后,如果遇到阻塞,则会执行第二个事务,不会等待。可以通过状态、通知、回调来调用处理结果。
import time
import asyncio

now = lambda: time.time()

async def fun():
    await asyncio.sleep(1)  # 正确的异步写法
    print('我是async任务...')

async def main():
    tasks = [fun() for _ in range(5)]  # 创建多个异步任务
    await asyncio.gather(*tasks)  # 并发执行所有任务

start = now()	

asyncio.run(main())  # 启动主事件循环

print('异步所花费的时间: %f s' % (now() - start))

打印结果:

我是async任务...
我是async任务...
我是async任务...
我是async任务...
我是async任务...
异步所花费的时间: 1.000891 s

异步编程相关概念

asyncio 库

asyncio是python3.4之后引入的标准库,内置对异步IO的支持。asyncio的编程模型是一个消息循环,我们从asyncio模块中直接获取一个EventLoop的引用,然后把需要执行的协程扔到EventLoop中执行,就实现了异步IO。

  1. event_loop事件循环:程序开启一个无限的循环,程序员会把一些函数注册到事件循环上,当满足事件发生的时候,调用相应的协程函数。
  2. coroutine协程:协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。
  3. task任务:一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态。
  4. future: 代表将来执行或者没有执行的任务的结果,它和task没有本质上的区别
  5. async / await关键字:python3.5用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口。

事件循环

理解成一个死循环,会去检测某些代码。

# 伪代码

任务列表 =[任务1、任务2....]

while True:
  可执行的任务列表, 执行完成的任务列表 = 去任务列表中检查所有的任务, 将“可执行”和“已完成”的任务返回
  
  for 就绪任务 in 可执行任务列表:
    执行就绪任务
  for 已完成的任务 in 执行完成的任务列表:
    任务列表中移除 已完成的任务
  
  如果 任务列表 中任务都完成,则终止循环

创建事件循环

import asyncio

# 异步任务1
async def work_1():
    for _ in range(5):
        print('我是异步任务1')
        await asyncio.sleep(1)

# 异步任务2
async def work_2():
    for _ in range(5):
        print('我是异步任务2')
        await asyncio.sleep(1)

tasks = [
    work_1(),
    work_2()
]

async def main():
    await asyncio.gather(*tasks)

asyncio.run(main())

快速上手

协程函数:定义函数的时候前面有async关键字。

协程对象:执行协程函数()得到的是协程对象。

只有一个协程时,可以这样运行:

import asyncio

# 定义一个异步函数
async def my_coroutine():
    print("Hello from the coroutine!")
    await asyncio.sleep(1)
    print("Goodbye from the coroutine!")

# 定义一个异步的 main 函数,直接运行协程
async def main():
    await my_coroutine()

# 运行 main 协程
asyncio.run(main())

或:

# 定义一个异步的 main 函数,使用 create_task 创建任务
async def main():
    task = asyncio.create_task(my_coroutine())  # 将协程包装为任务
    await task  # 等待任务完成

# 运行 main 协程
asyncio.run(main())

如果有多个协程:

import asyncio

# 定义异步任务
async def task_1():
    print("Task 1 starting...")
    await asyncio.sleep(1)
    print("Task 1 done.")

async def task_2():
    print("Task 2 starting...")
    await asyncio.sleep(1)
    print("Task 2 done.")

# 使用 asyncio.run 和 asyncio.gather 同时运行多个任务
async def main():
    await asyncio.gather(task_1(), task_2())

asyncio.run(main())

正式学习

协程对象

协程对象是 Python 中用于实现异步编程的基本构件,通常由 async 和 await 关键字来定义和使用。与普通的函数不同,协程函数(由 async def 定义)在被调用时不会立即执行,而是返回一个协程对象。你可以将这个协程对象当作一个未来事件来处理,它代表着某个异步操作的结果,通常需要通过事件循环来调度执行。

协程对象本身并不会立即执行,它的执行需要依赖于事件循环(Event Loop)。在 Python 中,通常会使用 asyncio.run() 或者 await 来启动协程。

协程对象有多种状态,最常见的包括:

  • PENDING(待执行):协程对象已经创建,但还未开始执行。
  • RUNNING(运行中):协程对象正在执行中。
  • DONE(已完成):协程对象的任务已经完成。

协程对象通常是通过 asyncio.create_task() 或 asyncio.ensure_future() 来转换为 Task 对象的,Task 对象是协程对象的包装,它在事件循环中执行并提供更丰富的功能(例如异常处理、任务完成后的回调等)。

属性/方法说明
__await__()获取协程对象的迭代器,在 await 语句中调用。通常不用直接调用。
cr_await返回正在等待的协程对象,用于调试协程。
cr_running如果协程正在执行,则返回 True,否则返回 False。
cr_done()判断协程是否已完成,返回一个布尔值。
cr_exception()返回协程中的异常(如果有的话)。如果协程尚未完成则抛出 RuntimeError。
cr_result()如果协程已完成并且没有异常,返回协程的结果。如果协程没有完成或抛出了异常,则抛出 RuntimeError。
create_task()用于将协程对象包装成一个 Task 对象,启动协程执行。
asyncio.run()运行协程并阻塞当前线程,直到协程完成。

await 关键字

await后面跟的是一个可等待的对象。await的作用就是等待后面的函数返回结果,只有后面的函数返回结果了,程序才会继续往下执行。

可等待的对象:

  • 协程对象
  • future对象
  • task对象

await在等待以上对象所返回的值。

import asyncio
from requests import get


def send_request():
    response = get("http://localhost:5000")
    return f"接收到响应:{response.text}"

async def main():
    response = await send_request()
    print(response)

asyncio.run(main())

协程嵌套

在一段协程中等待另一协程。

import asyncio

async def others():
    print('start')
    await asyncio.sleep(2)  # 当前等待的sleep对象的返回值为None
    print('end')
    return '返回值'

async def func():
    print('执行协程函数内部代码')
    # 遇到阻塞操作,挂起该协程。当阻塞操作完成后再继续往下执行
    # 当前协程挂起时,事件循环可以执行其他协程
    response = await others()  # others()是一个协程对象
    print('结果:', response)

asyncio.run(func())

打印信息:

执行协程函数内部代码
start
end
IO操作结果为: 返回值

一个协程函数中可以同时存在多个 await 关键字。

import asyncio

async def others():
    print('start')
    await asyncio.sleep(2)
    print('end')
    return '返回值'

async def func():
    print('执行协程函数内部代码')
    response_1 = await others()
    print('response_1的IO操作结果为:', response_1)

    response_2 = await others()
    print('response_2的IO操作结果为:', response_2)

asyncio.run(func())

打印结果:

执行协程函数内部代码
start
end
response_1的IO操作结果为: 返回值
start
end
response_2的IO操作结果为: 返回值

代码先执行func中的print('执行协程函数内部代码'),然后遇到await,协程挂起。等待await后面的程序全部执行完毕并返回结果。发现此处await的是一个协程,程序进入协程others() 中,执行print('start'),又遇到了await,此次await的是一个IO操作,则程序挂起,去执行其他的协程。(这里没有其他的协程,于是只能老实等待)。asyncio.sleep(2)执行结束后,回到func()里面继续执行挂起之后的代码。response_1结束。response_2同理。

task 对象

在 Python 3.13 中,创建 Task 对象的推荐方法是使用 asyncio.create_task() 函数。该函数可以将一个协程对象包装成一个 Task 对象,并将其调度到事件循环中执行。使用方法非常简单,只需要将协程对象传递给 asyncio.create_task(),例如:asyncio.create_task(coroutine)。这种方式可以确保任务在后台异步执行,而不会阻塞主线程。对于大多数场景,asyncio.create_task() 是最直接、最简洁的选择。需要注意的是,asyncio.run() 也会自动处理协程封装成 Task 的工作,因此如果是启动整个事件循环并运行一个单一的协程,使用 asyncio.run(coroutine) 即可,无需显式创建 Task 对象。在 Python 3.13 中,手动实例化 Task 对象不再推荐,asyncio.create_task() 是标准且优雅的解决方案。

Task 对象的属性和方法:

属性/方法描述
Task.cancel()取消任务。如果任务已开始执行,它会尽量中断任务并抛出 CancelledError。
Task.cancelled()返回布尔值,表示任务是否已被取消。
Task.result()如果任务已成功执行并返回结果,则返回该结果。如果任务没有完成(如被取消或出现异常),会抛出 InvalidStateError。
Task.exception()如果任务抛出了异常,返回该异常对象。如果任务未抛出异常,则返回 None。
Task.done()返回布尔值,表示任务是否已完成(包括成功、异常或被取消)。
Task._loop返回任务所关联的事件循环(事件循环对象)。
Task._source_traceback如果任务抛出了异常,返回源头的 traceback(源代码跟踪)。
Task.get_stack()返回当前任务的调用堆栈列表。
Task._create()私有方法,用于创建任务。通常不直接调用,而是通过 asyncio.create_task() 或 asyncio.ensure_future() 来创建任务。
Task.__del__()当任务被销毁时调用的清理方法,通常不需要手动调用。

Task 对象具有如下生命周期:

任务创建:通过 asyncio.create_task() 或 asyncio.ensure_future() 创建任务。

任务运行:任务会在事件循环中异步运行。

任务取消:可以调用 cancel() 来请求取消任务,任务并非立即停止,而是标记为取消,接着可以抛出 CancelledError 异常。

任务完成:任务执行完毕后,可以通过 result() 或 exception() 获取任务结果或异常。

Task 对象有两种完成状态:

  • done():如果任务完成,无论是正常结束还是通过异常结束,都返回 True。
  • cancelled():如果任务被取消,返回 True。
import asyncio

async def others():
    print('start')
    await asyncio.sleep(2)
    print('end')
    return '返回值'

async def func():
    print('执行协程函数内部代码')

    # python3.7以上版本写法
    task_1 = asyncio.create_task(others())
    task_2 = asyncio.create_task(others())

    response_1 = await task_1
    print('task_1的IO操作结果为:', response_1)
    response_2 = await task_2
    print('task_2的IO操作结果为:', response_2)

asyncio.run(func())

打印结果:

执行协程函数内部代码
start
start
(隔两秒)
end
end
task_1的IO操作结果为: 返回值
task_2的IO操作结果为: 返回值

发现这里程序运行的结果和前面在await后面加协程对象的时候不太一样。这是为什么呢?

主要原因是加了两个 asyncio.create_task()语句,添加了两个task对象。

程序执行的逻辑如下:

  • 创建事件循环asyncio.run()
  • 运行主函数func(),执行print(),创建task_1,创建task_2
  • 程序来到response_1 = await task_1,response_1一直在等待await后面的task_1传回返回值。至此,由于遇到了await,func()主程序挂起.
  • 进入task_1中执行。在await中执行print('start')后,遇到await阻塞,此时task_1挂起,等待await后面的程序执行完毕。与此同时,发现除了task_1外还存在另一个任务task_2,于是去执行task_2里面的程序。
  • 执行task_2中的print('start')后又遇到await等待,但此时已经没有其他任务了,因此只能等待task_1和task_2中挂起的程序执行完毕。
  • task_1先结束挂起,执行print('end'),并return返回值。谁先结束挂起,就先执行谁那边的后续代码
  • 在task_1执行完print('end')之后,return返回值之前,task_2也结束挂起了。执行了print('end'),return返回值。这里虽然返回了值,但是response_2只能等到await的时候才能接收到这个值
  • 函数返回至func()中,response_1获得了await task_1返回的值,执行了print()
  • response_2获得了await task_2的返回值,执行了print()

将程序改成如下的形式可以更加深刻理解程序执行的过程。

import asyncio

async def others(time, name):
    print('start', name)
    await asyncio.sleep(time)
    print('end', name)
    return '返回值' + name

async def func():
    print('执行协程函数内部代码')

    task_1 = asyncio.create_task(others(3, 'task_1'))
    task_2 = asyncio.create_task(others(2, 'task_2'))

    response_1 = await task_1
    print('task_1的IO操作结果为:', response_1)
    response_2 = await task_2
    print('task_2的IO操作结果为:', response_2)

asyncio.run(func())

打印结果:

执行协程函数内部代码
start task_1
start task_2
(隔三秒)
end task_2
end task_1
task_1的IO操作结果为: 返回值task_1
task_2的IO操作结果为: 返回值task_2

倘若交换task_1和task_2的sleep时间:

import asyncio

async def others(time, name):
    print('start', name)
    await asyncio.sleep(time)
    print('end', name)
    return '返回值' + name

async def func():
    print('执行协程函数内部代码')

    task_1 = asyncio.create_task(others(2, 'task_1'))
    task_2 = asyncio.create_task(others(3, 'task_2'))

    response_1 = await task_1
    print('task_1的IO操作结果为:', response_1)
    response_2 = await task_2
    print('task_2的IO操作结果为:', response_2)

asyncio.run(func())

执行协程函数内部代码
start task_1
start task_2
(隔两秒)
end task_1
task_1的IO操作结果为: 返回值task_1
(隔一秒)
end task_2
task_2的IO操作结果为: 返回值task_2

如果不使用 Task,比如改为如下的程序:

import asyncio

async def others(time, name):
       print('start', name)
       await asyncio.sleep(time)
       print('end', name)
       return '返回值' + name

async def func():
       print('执行协程函数内部代码')
    
       response_1 = await others(2, 'task_1')
       print('task_1的IO操作结果为:', response_1)
       response_2 = await others(3, 'task_2')
       print('task_2的IO操作结果为:', response_2)

asyncio.run(func())

程序不会并行执行,这一点需注意。

协程并发

以上代码示例的代码写法一般用的很少,只是为了便于理解,正式写法如下: 使用asyncio.wait(task_list)即可。

import time
import asyncio


# 使用async 来定义一个协程对象
async def do_work(x):
    print('waiting:', x)
    await asyncio.sleep(x)  # 模拟一个IO操作
    return 'Done after {}s'.format(x)


async def main():
    # 创建多个协程对象
    coroutine_1 = do_work(1)
    coroutine_2 = do_work(2)
    coroutine_3 = do_work(3)

    tasks = [# 在列表中创建多个task对象
        asyncio.create_task(coroutine_1), asyncio.create_task(coroutine_2),
        asyncio.create_task(coroutine_3)]

    # 使用 asyncio.wait 来等待任务完成,timeout=None 表示等待直到所有任务完成
    dones, pendings = await asyncio.wait(tasks, timeout=None)

    for task in dones:
        print('task result:', task.result())

    print("pendings:", pendings)


now = lambda: time.time()
start = now()

# 使用 asyncio.run 来启动协程和事件循环
asyncio.run(main())

print('Time:', now() - start)

打印结果:

waiting: 1
waiting: 2
waiting: 3
task result: Done after 1s
task result: Done after 2s
task result: Done after 3s
pendings: set()
Time: 3.002121925354004

我们可以在创建 task 时给 task 取名字。

coroutine_1 = do_work(1)
coroutine_2 = do_work(2)
coroutine_3 = do_work(3)

tasks = [
   asyncio.create_task(coroutine_1, name='MY_COROUTINE_1'),
   asyncio.create_task(coroutine_2, name='MY_COROUTINE_2'),
   asyncio.create_task(coroutine_3, name='MY_COROUTINE_3')
]

future 对象

future类是task类的基类,task对象只有运算得到返回值后,await的对象才能传回值并且向下运行。这个功能就是future对象来实现的。future源码中存在一个_state,一旦_state值变成finished,await就不再继续等待。future一般不手动去写它。

若执行以下代码,则程序一直会处于等待状态。(卡死)

import asyncio

async def main():
    # 创建一个任务(future对象),什么也不干
    fut = asyncio.Future()  # 创建一个 Future 对象
    # await会等待任务最终结果,没有结果则会一直等待下去
    await fut

asyncio.run(main())
import asyncio

async def set_after(fut):
    await asyncio.sleep(2)
    fut.set_result('666')

async def main():
    # 创建一个任务(future对象),什么也不干
    fut = asyncio.Future()  # 创建 Future 对象

    # 等待任务最终结果,没有结果则会一直等待下去
    # 手动设置future的最终结果,那样fut就可以结束了
    await asyncio.create_task(set_after(fut))  # 使用 create_task 来创建任务并调度执行
    data = await fut  # 等待future对象获取最终结果,否则一直等待下去
    print(data)

# 使用 asyncio.run() 来执行协程,避免手动管理事件循环
asyncio.run(main())

以上两段代码没有实际意义,只是加深对于future的理解。

concurrent 中的 future 对象

在python中还有一个concurrent模块,concurrent模块也有对应的java接口。concurrent.future.Future对象和asyncio.Future对象没有任何关系,是完全不同的两个东西。concurrent.futures.Future是在利用进程池、线程池来实现异步操作时来使用的对象。

import time
from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor  # 用线程执行异步
from concurrent.futures.process import ProcessPoolExecutor  # 用进程执行异步


def func(value):
    time.sleep(1)
    print(value)
    return 123


# 创建线程池
pool = ThreadPoolExecutor(max_workers=2)
# 或者创建进程池
# pool =  ProcessPoolExecutor(max_workers=5)

for i in range(4):
    fut = pool.submit(func, i)  # 返回的是一个Future对象, fut.result()才是返回结果
    print(fut)
    # print(fut.result())

打印结果:

<Future at 0x7f02f1bf2850 state=running>
<Future at 0x7f02f1a1b3a0 state=running>
<Future at 0x7f02f1a1b700 state=pending>
<Future at 0x7f02f1a1b820 state=pending>
1
0
3
2

这边可以发现fut对象是并行创建的。

协程与线程和进程的交叉使用

日后写代码可能会存在协程和线程、进程下的两种Future交叉使用的情况。 (在编程的时候,遇到某个第三方模块不支持协程异步的时候用) 一般情况下,不会交叉使用。在并发编程中,要么用asyncio这种协程式的来实现异步,要么统一用进程池或线程池来实现异步。有些情况下会交叉使用,例如:crm项目中80%是基于协程异步编程 + MySQL,这种情况下只有MySQL内部也支持async异步,两者才能实现无缝衔接。假如MySQL不支持协程,则需要考虑对此用线程、进程来实现了。

asyncio.run_in_executor()实现异步的内部原理:

  • 第一步:内部会先调用TreadPoolExector的submit方法去线程中申请一个func_1函数,并返回一个concurrent.futures.Future对象 (与asyncio没有任何关系)
  • 第二步:调用asyncio.wrap_future将concurrent.futures.Future对象包装成asyncio.Future对象

因为concurrent.futures.Future对象不支持await语法,因此需要转换成asyncio.Future对象才能进行使用

import time
import asyncio
import concurrent.futures

def func_1():
    time.sleep(2)
    return '这是一个任务的返回值...'

async def main():
    loop = asyncio.get_running_loop()

    # 在协程函数中运行普通函数 在执行函数时,协程内部会自动创建一个线程池来运行任务
    # run_in_executor()方法第一个参数为None时则默认创建一个线程池
    fut = loop.run_in_executor(None, func_1)
    result = await fut
    print('当前方式会自动创建一个线程池去执行普通函数: ', result)

    # 在协程函数中运行基于线程池的任务, 效果与以上代码一致
    with concurrent.futures.ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, func_1)
        print('在线程池中得到的执行结果: ', result)

    # 在协程函数中运行基于进程池的任务
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, func_1)
        print('在进程池中得到的执行结果: ', result)

if __name__ == "__main__":
    asyncio.run(main())

异步与非异步模块混合使用案例

import asyncio
import requests

# 异步下载图片函数
async def download_image(url):
    # 发送网络请求下载图片
    print('开始下载:', url)
    # request不支持异步,使用线程池来辅助实现
    loop = asyncio.get_running_loop()
    future = loop.run_in_executor(None, requests.get, url)
    response = await future
    print('下载完成')
    
    # 图片保存
    file_name = url.rsplit('/')[-1]
    with open(file_name, 'wb') as f:
        f.write(response.content)

# 主程序
async def main():
    url_list = [
        'http://pic.bizhi360.com/bbpic/98/10798.jpg',
        'http://pic.bizhi360.com/bbpic/92/10792.jpg',
        'http://pic.bizhi360.com/bbpic/86/10386.jpg'
    ]
    
    # 使用 asyncio.create_task 来创建协程任务
    tasks = [asyncio.create_task(download_image(url)) for url in url_list]

    # 等待所有任务完成
    await asyncio.gather(*tasks)

# 使用 asyncio.run 来启动整个程序
if __name__ == '__main__':
    asyncio.run(main())

启动程序:

  • 程序从 asyncio.run(main()) 开始执行,asyncio.run() 会创建一个事件循环并开始执行 main() 协程。

创建协程任务:

  • main() 函数中定义了一个图片 URL 列表 (url_list)。

  • 然后通过列表推导式 [asyncio.create_task(download_image(url)) for url in url_list] 创建了 3 个异步任务,每个任务负责下载一个图片。

  • asyncio.create_task(download_image(url)) 会立刻将任务添加到事件循环中并开始执行相应的协程函数 download_image(url)。

download_image(url) 函数的执行:

  • 每个 download_image(url) 调用开始执行时,首先会打印 '开始下载: <url>',然后执行 requests.get(url) 来发送 HTTP 请求下载图片。
  • 由于 requests 库是同步的,而我们在异步代码中不能直接使用同步的阻塞 I/O 操作,因此通过 loop.run_in_executor(None, requests.get, url) 将 HTTP 请求操作放入一个线程池中运行。这使得 requests.get() 可以在不同的线程中执行,而不会阻塞主线程。
  • 在调用 await future 时,协程会挂起,等待 requests.get() 的执行结果(即图片下载完成),此时控制权会返回到事件循环,让其他任务得以执行。

并发执行:

  • 因为每个下载操作都通过 asyncio.create_task() 创建了任务,并且任务的执行是并发的,所以多个下载操作可以同时进行。
  • 当一个任务处于 await 阶段(比如等待 requests.get() 返回),事件循环可以继续执行其他任务。也就是说,当第一个任务在等待下载时,第二个和第三个任务也会继续执行,下载操作不会互相阻塞。

下载完成与保存:

  • 当 requests.get(url) 下载完成并返回响应时,future.set_result(response) 会使 await future 恢复执行,协程会继续执行后面的代码。
  • 每个任务都会打印 '下载完成',然后从响应中获取图片内容 (response.content) 并保存到本地文件中,文件名根据 URL 的最后一部分确定。

等待任务完成:

  • await asyncio.gather(*tasks) 等待所有的协程任务完成。asyncio.gather() 会并行执行所有传入的协程任务,直到所有任务都结束。
  • asyncio.gather() 返回一个结果列表,这里我们没有处理返回值,但你可以根据需要访问任务的返回值。

程序结束:

  • 当所有任务都完成后,asyncio.run(main()) 结束执行,程序退出。

异步迭代器

在 Python 中,异步迭代器是一个支持异步遍历的对象,可以用来处理需要异步操作的迭代场景。与普通的同步迭代器不同,异步迭代器中的数据获取通常会涉及到异步操作(如 I/O 操作),因此迭代过程本身是异步的。

异步迭代器的基本概念

一个异步迭代器需要实现以下两个方法:

aiter(): 返回一个异步迭代器对象。这个方法通常返回 self。

anext(): 这是异步迭代器的核心方法,它用于获取下一个值。它返回一个协程对象,因此需要用 await 来等待结果。如果没有更多的元素,anext() 会抛出 StopAsyncIteration 异常,表示迭代已经完成。

异步迭代器与同步迭代器的对比

同步迭代器:同步迭代器会通过 iter() 和 next() 方法来返回值,并且每次 next() 调用时会直接返回结果。

异步迭代器:异步迭代器需要通过 aiter() 和 anext() 方法来实现,其中 anext() 会返回一个协程对象,协程对象通过 await 等待结果。

示例:实现异步迭代器

假设我们需要模拟从数据库或网络中异步读取数据,我们可以通过异步迭代器来按需加载数据。

import asyncio

class AsyncCounter:
    def __init__(self, start, end):
        self.current = start
        self.end = end

    # 返回迭代器本身
    def __aiter__(self):
        return self
    
    # 获取下一个值,异步获取
    async def __anext__(self):
        if self.current > self.end:
            raise StopAsyncIteration  # 结束迭代
        await asyncio.sleep(1)  # 模拟异步操作(比如从数据库中读取)
        self.current += 1
        return self.current - 1

# 异步遍历数据
async def main():
    async for number in AsyncCounter(1, 5):
        print(f"Received number: {number}")

# 运行异步任务
asyncio.run(main())

解释:

AsyncCounter 类:

该类实现了一个异步迭代器,从 start 到 end 依次返回数字。

aiter() 方法返回迭代器本身,即 self。

anext() 方法返回下一个数字,并且模拟一个异步操作(通过 await asyncio.sleep(1) 模拟延迟)。

当 self.current 超过 end 时,通过 raise StopAsyncIteration 来终止迭代。

async for:

在 main() 函数中,使用 async for 循环来异步地遍历 AsyncCounter 实例。当调用 anext() 时,Python 会自动等待结果,并且在协程中执行异步操作。

运行:

asyncio.run(main()) 启动事件循环并运行 main() 协程。

示例:使用异步迭代器读取文件

假设我们有一个大文件,想要按行异步读取文件内容,可以使用异步迭代器来实现。

import asyncio

class AsyncFileReader:
    def __init__(self, filename):
        self.filename = filename

    def __aiter__(self):
        self.file = open(self.filename, 'r')
        return self
    
    async def __anext__(self):
        line = await asyncio.to_thread(self.file.readline)
        if not line:
            self.file.close()
            raise StopAsyncIteration
        return line.strip()

async def main():
    async for line in AsyncFileReader('sample.txt'):
        print(f"读取到的行:{line}")

asyncio.run(main())

解释:

AsyncFileReader 类:

该类实现了一个异步迭代器,用于异步按行读取文件。

在 aiter() 中,打开文件并返回迭代器本身。

在 anext() 中,使用 asyncio.to_thread() 将文件读取操作放入线程池中,以避免阻塞事件循环。

asyncio.to_thread() 是一个协程,它将同步的阻塞操作(如文件读取)转移到后台线程进行异步执行。

async for:

async for 循环用于异步遍历文件的每一行。anext() 会异步读取每一行,直到文件末尾。

异步迭代器常见场景

异步读取文件或数据库:当需要从文件或数据库中异步获取数据时,异步迭代器可以避免阻塞事件循环,允许程序执行其他任务。

网络请求:异步迭代器可用于按需从网络下载数据,而无需阻塞主线程。

异步任务管理:在处理多个异步任务时,异步迭代器可以按顺序返回每个任务的结果,特别适合需要分批处理的场景。

绑定回调

在task执行完毕的时候可以获取执行的结果,回调的最后一个参数是future对象,通过该对象可以获取协程返回值,如果回调需要多个参数,可以通过偏函数导入。

import asyncio

# 使用async 来定义一个协程对象
async def do_work(x):
    print('waiting:', x)
    await asyncio.sleep(x)
    return 'Done after {} s'.format(x)

# 给任务添加回调函数
def callback(future):
    print('callback:', future.result())

async def run_task():
    # 获取协程对象
    coroutine = do_work(3)
    # 创建task
    task = asyncio.create_task(coroutine)
    # 给task添加回调
    task.add_done_callback(callback)
    return await task

# 将协程对象加入到事件循环中
asyncio.run(run_task())  # 注意这里加上了括号,调用异步任务

回调一直是很多异步编程的噩梦,程序员更喜欢用同步的编写方式写异步代码,以避免回调的问题。回调中我们使用了future的result()方法,前面不绑定回调的例子中,可以看到task的finished状态.在那个时候,可以直接读取task的result方法。

import asyncio

# 使用async 来定义一个协程对象
async def do_work(x):
    print('waiting:', x)
    await asyncio.sleep(x)
    return f'Done after {x} s'

# 获取协程对象
async def main():
    coroutine = do_work(3)
    # 创建task
    task = asyncio.create_task(coroutine)

    # 等待任务完成
    result = await task
    print('直接获取返回结果:', result)

# 使用asyncio.run来启动事件循环
asyncio.run(main())

阻塞与 await

使用async可以定义协程对象,使用await可以针对耗时的操作进行挂起,就像生成器中的yield一样,函数交出控制权。协程遇到await,事件循环就会挂起该协程,执行别的协程,直到其他的协程也挂起或者执行完毕,再执行下一个协程。

import time
import asyncio

# 使用async 来定义一个协程对象
async def do_work(x):
    print('waiting:', x)
    await asyncio.sleep(x)  # 模拟一个IO操作
    return f'Done after {x} s'

async def main():
    now = lambda: time.time()
    start = now()
    # 获取协程对象
    coroutine = do_work(3)
    # 创建task
    task = asyncio.create_task(coroutine)
    # 将协程对象加入到事件循环中
    await task
    # 下面的程序会阻塞住,直到协程执行完毕
    print('task result:', task.result())
    print('Time:', now() - start)

asyncio.run(main())

打印结果:

waiting: 3
task result: Done after 3s
Time: 3.002943754196167

并发执行与同步执行

对创建的future对象进行变量赋值并进行await。

import time
import asyncio

now = lambda: time.time()


async def func():
    task_1 = asyncio.ensure_future(asyncio.sleep(1))
    task_2 = asyncio.ensure_future(asyncio.sleep(1))
    await task_1
    await task_2


async def main():
    start = now()
    for i in range(5):
        await func()
    print('异步所花费的时间: %f s' % (now() - start))

asyncio.run(main())

打印结果:

异步所花费的时间: 5.005247 s

直接运行future对象:

import time
import asyncio

now = lambda: time.time()


async def func():
    await asyncio.ensure_future(asyncio.sleep(1))
    await asyncio.ensure_future(asyncio.sleep(1))


async def main():
    start = now()
    for i in range(5):
        await func()
    print('异步所花费的时间: %f s' % (now() - start))

asyncio.run(main())

打印结果:

异步所花费的时间: 10.010090 s

直接对asyncio.ensure_future()创建的future对象进行await是不能实现并发的,必须将创建的对象返回给一个变量,在对该变量绑定的future对象进行await才可实现。更好的方法是使用asyncio.gather()方法或者aysncio.wait()方法来实现并发

异步上下文管理器

这种对象通过定义__aenter__()和__aexit__()方法来对asyncio with语句中的环境进行控制,叫做异步上下文管理器。

import asyncio

class AsyncContextManager:
    async def do_something(self):
        return '模拟业务操作'

    async def __aenter__(self):
        print("进入上下文管理器")
        await asyncio.sleep(1)
        return self

    async def __aexit__(self, exec_type, exc_val, exc_tb):
        """
         with语句运行结束之后触发此方法的运行
         exc_type:如果抛出异常, 这里获取异常类型
         exc_val:如果抛出异常, 这里显示异常内容
         exc_tb:如果抛出异常, 这里显示所在位置, traceback
         """
        print("退出上下文管理器")
        await asyncio.sleep(1)

async def main():
    async with AsyncContextManager() as fp:
        result = await fp.do_something()
        print(result)

asyncio.run(main())

实际使用

异步 HTTP 请求

asyncio 可以与 aiohttp 配合使用,进行异步 HTTP 请求。相比于传统的阻塞请求,aiohttp 提供了非阻塞的网络通信,能够显著提高并发请求的性能。

import aiohttp
import asyncio

async def fetch(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

async def main():
    url = "http://example.com"
    html = await fetch(url)
    print(html)

asyncio.run(main())

异步MySQL

# pip install aiomysql

import asyncio
import aiomysql

async def execute():
    # 网络IO操作,连接mysql
    conn = await aiomysql.connect(host='127.0.0.1', port=3306, user='root', password='root', db='test')
    # 网络IO操作,创建cursor()
    cur = await conn.cursor()
    # 网络IO操作,执行sql语句
    await cur.execute('SELECT host.user FROM user')
    # 网络IO操作,获取sql结果
    result = cur.fetchall()
    print(result)
    # 网络IO操作,关闭链接
    await cur.close()
    conn.close()

asyncio.run(execute())
import asyncio
import aiomysql

async def execute(host, password):
    # 网络IO操作,连接mysql
    conn = await aiomysql.connect(host=host, port=3306, user='root', password=password, db='test')
    # 网络IO操作,创建cursor()
    cur = await conn.cursor()
    # 网络IO操作,执行sql语句
    await cur.execute('SELECT host.user FROM user')
    # 网络IO操作,获取sql结果
    result = cur.fetchall()
    print(result)
    # 网络IO操作,关闭链接
    await cur.close()
    conn.close()

async def main():
    task_list = [
        asyncio.ensure_future(execute('127.0.0.1', 'root')),
        asyncio.ensure_future(execute('40.0.0.1', 'root'))
    ]

    await asyncio.wait(task_list)

asyncio.run(execute())

补充

asyncio 模块的常用方法和类

方法/类功能描述
asyncio.run(coroutine)用于运行最高层级的协程,创建事件循环并执行协程,直到其完成。
asyncio.create_task(coroutine)创建一个任务(Task),该任务表示一个待执行的协程,任务会被调度到事件循环中运行。
asyncio.ensure_future(coroutine)将协程转换为 Future 对象并将其调度到事件循环,功能与 create_task 类似,但允许传递未来的 Future 对象。
asyncio.gather(*coroutines)并行运行多个协程并收集其结果。返回一个 Future 对象,可以用 await 等待多个协程的执行结果。
asyncio.wait(futures, timeout=None)等待多个 Future 对象中的某些完成,返回一个元组,包括完成的任务和未完成的任务。
asyncio.sleep(seconds)异步休眠,非阻塞,等待指定的时间(秒)。该方法通常用于模拟 I/O 操作或延时。
asyncio.get_event_loop()获取当前线程的事件循环对象。该方法在 Python 3.10 之前是常用的,建议使用 asyncio.get_running_loop()。
asyncio.get_running_loop()获取当前运行中的事件循环,通常用于协程中。
asyncio.run_until_complete(future)在事件循环中运行直到 Future 对象完成,常用于 get_event_loop() 中。
asyncio.to_thread(func, *args)将阻塞性函数放入线程池中异步执行,适用于不支持异步的 I/O 操作(如数据库查询、文件读写)。
asyncio.CancelledError当任务被取消时抛出的异常。
asyncio.FutureFuture 对象表示尚未完成的异步操作,它是一个低级别的对象,可以显式控制任务的执行结果。
asyncio.TaskTask 类继承自 Future,表示一个正在执行的协程,允许通过 await 获取其结果。
asyncio.Event用于在协程之间同步的事件对象。可以通过 set() 或 clear() 来控制事件的状态,并在其他协程中通过 wait() 等待事件的发生。
asyncio.Queue异步队列对象,允许多个生产者和消费者协作,提供了 put() 和 get() 方法,用于在异步环境下的任务调度和同步。
asyncio.Lock异步锁,用于在多个协程之间控制资源的访问。不同于同步锁,它不会阻塞事件循环。
asyncio.Semaphore异步信号量,限制同时访问某些资源的协程数量,类似于锁,但允许指定同时执行的最大任务数量。
asyncio.Condition异步条件变量,用于在协程之间同步操作,通常与 Lock 或 Semaphore 一起使用,以确保条件满足时才继续执行。
asyncio.StreamReader用于处理异步读取流的类,通常与 StreamWriter 一起使用,用于异步处理流式数据(如网络流)。
asyncio.StreamWriter用于处理异步写入流的类,通常与 StreamReader 一起使用,用于异步写入流式数据(如网络流)。
asyncio.SubprocessProtocol用于管理和控制子进程的协议类,能够与子进程进行异步交互。
asyncio.SubprocessStreamReader用于从子进程的标准输出或标准错误流中异步读取数据。
asyncio.SubprocessStreamWriter用于向子进程的标准输入流中异步写入数据。
asyncio.shield(coroutine)用于防止任务在执行过程中被取消。
上次更新: 1/3/2025, 7:17:27 AM