Python 异步编程
介绍
在同一线程内,一段执行代码过程中,可以中断并跳转到另一段代码中,接着之前中断的地方继续执行。
协程运行状态于多线程类似。
协程优点:
- 无需线程上下文切换,协程避免了无意义的调度,可以提高性能。
- 无需原子操作锁定及同步开销。
- 方便切换控制流,简化编程模型
- 高并发 + 高扩展性 + 低成本,一个CPU支持上万的协程不是问题,很适合用于高并发处理。
协程缺点:
- 无法利用多核资源。协程的本质是单线程,不能同时将单个CPU的多个核用上,协程需要进程配合才能运行在多CPU上。(不过我们日常编程不会有这个问题,除非是CPU密集型应用)
- 进行阻塞操作(如IO时)会阻塞掉整个程序
前置知识
生成器
例:
def fun():
print('---------start---------')
while True:
yield '生成器返回的数据....'
g = fun()
print(next(g))
print('*' * 20)
print(next(g))
运行结果:
---------start---------
生成器返回的数据....
********************
生成器返回的数据....
由以上代码得知生成器在第二个运行的时候会根据上一次运行的状态继续向下执行。利用这一个特性我们可以使用生成器进行任务之间的切换。
使用
yield
实现协程
import time
def func_a():
while True:
print('-------A-------')
yield
time.sleep(0.5)
def func_b(obj):
while True:
print('---------B------------')
obj.__next__()
a = func_a()
func_b(a)
打印结果:
---------B------------
-------A-------
---------B------------
-------A-------
---------B------------
-------A-------
---------B------------
当前程序不断交替执行,完成了程序之间的切换。
异步IO
- 同步:先执行第一个事务,如果遇到阻塞,则进行等待直到第一个事务执行完毕,再执行第二个事务。
import time
def foo():
time.sleep(1)
now = lambda: time.time()
start = now()
for i in range(5):
foo()
print('同步所花费的时间: %f s' % (now() - start))
打印结果:
同步所花费的时间: 5.004482 s
- 异步:执行第一个事务之后,如果遇到阻塞,则会执行第二个事务,不会等待。可以通过状态、通知、回调来调用处理结果。
import time
import asyncio
now = lambda: time.time()
async def fun():
await asyncio.sleep(1) # 正确的异步写法
print('我是async任务...')
async def main():
tasks = [fun() for _ in range(5)] # 创建多个异步任务
await asyncio.gather(*tasks) # 并发执行所有任务
start = now()
asyncio.run(main()) # 启动主事件循环
print('异步所花费的时间: %f s' % (now() - start))
打印结果:
我是async任务...
我是async任务...
我是async任务...
我是async任务...
我是async任务...
异步所花费的时间: 1.000891 s
异步编程相关概念
asyncio 库
asyncio
是python3.4
之后引入的标准库,内置对异步IO的支持。asyncio
的编程模型是一个消息循环,我们从asyncio
模块中直接获取一个EventLoop
的引用,然后把需要执行的协程扔到EventLoop
中执行,就实现了异步IO。
event_loop
事件循环:程序开启一个无限的循环,程序员会把一些函数注册到事件循环上,当满足事件发生的时候,调用相应的协程函数。coroutine
协程:协程对象,指一个使用async
关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。task
任务:一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态。future
: 代表将来执行或者没有执行的任务的结果,它和task
没有本质上的区别async / await
关键字:python3.5
用于定义协程的关键字,async
定义一个协程,await
用于挂起阻塞的异步调用接口。
事件循环
理解成一个死循环,会去检测某些代码。
# 伪代码
任务列表 =[任务1、任务2....]
while True:
可执行的任务列表, 执行完成的任务列表 = 去任务列表中检查所有的任务, 将“可执行”和“已完成”的任务返回
for 就绪任务 in 可执行任务列表:
执行就绪任务
for 已完成的任务 in 执行完成的任务列表:
任务列表中移除 已完成的任务
如果 任务列表 中任务都完成,则终止循环
创建事件循环
import asyncio
# 异步任务1
async def work_1():
for _ in range(5):
print('我是异步任务1')
await asyncio.sleep(1)
# 异步任务2
async def work_2():
for _ in range(5):
print('我是异步任务2')
await asyncio.sleep(1)
tasks = [
work_1(),
work_2()
]
async def main():
await asyncio.gather(*tasks)
asyncio.run(main())
快速上手
协程函数:定义函数的时候前面有async
关键字。
协程对象:执行协程函数()
得到的是协程对象。
只有一个协程时,可以这样运行:
import asyncio
# 定义一个异步函数
async def my_coroutine():
print("Hello from the coroutine!")
await asyncio.sleep(1)
print("Goodbye from the coroutine!")
# 定义一个异步的 main 函数,直接运行协程
async def main():
await my_coroutine()
# 运行 main 协程
asyncio.run(main())
或:
# 定义一个异步的 main 函数,使用 create_task 创建任务
async def main():
task = asyncio.create_task(my_coroutine()) # 将协程包装为任务
await task # 等待任务完成
# 运行 main 协程
asyncio.run(main())
如果有多个协程:
import asyncio
# 定义异步任务
async def task_1():
print("Task 1 starting...")
await asyncio.sleep(1)
print("Task 1 done.")
async def task_2():
print("Task 2 starting...")
await asyncio.sleep(1)
print("Task 2 done.")
# 使用 asyncio.run 和 asyncio.gather 同时运行多个任务
async def main():
await asyncio.gather(task_1(), task_2())
asyncio.run(main())
正式学习
协程对象
协程对象是 Python 中用于实现异步编程的基本构件,通常由 async
和 await
关键字来定义和使用。与普通的函数不同,协程函数(由 async def
定义)在被调用时不会立即执行,而是返回一个协程对象。你可以将这个协程对象当作一个未来事件来处理,它代表着某个异步操作的结果,通常需要通过事件循环来调度执行。
协程对象本身并不会立即执行,它的执行需要依赖于事件循环(Event Loop)。在 Python 中,通常会使用 asyncio.run()
或者 await
来启动协程。
协程对象有多种状态,最常见的包括:
- PENDING(待执行):协程对象已经创建,但还未开始执行。
- RUNNING(运行中):协程对象正在执行中。
- DONE(已完成):协程对象的任务已经完成。
协程对象通常是通过 asyncio.create_task()
或 asyncio.ensure_future()
来转换为 Task
对象的,Task
对象是协程对象的包装,它在事件循环中执行并提供更丰富的功能(例如异常处理、任务完成后的回调等)。
属性/方法 | 说明 |
---|---|
__await__() | 获取协程对象的迭代器,在 await 语句中调用。通常不用直接调用。 |
cr_await | 返回正在等待的协程对象,用于调试协程。 |
cr_running | 如果协程正在执行,则返回 True ,否则返回 False 。 |
cr_done() | 判断协程是否已完成,返回一个布尔值。 |
cr_exception() | 返回协程中的异常(如果有的话)。如果协程尚未完成则抛出 RuntimeError 。 |
cr_result() | 如果协程已完成并且没有异常,返回协程的结果。如果协程没有完成或抛出了异常,则抛出 RuntimeError 。 |
create_task() | 用于将协程对象包装成一个 Task 对象,启动协程执行。 |
asyncio.run() | 运行协程并阻塞当前线程,直到协程完成。 |
await 关键字
await
后面跟的是一个可等待的对象。await
的作用就是等待后面的函数返回结果,只有后面的函数返回结果了,程序才会继续往下执行。
可等待的对象:
- 协程对象
future
对象task
对象
await
在等待以上对象所返回的值。
import asyncio
from requests import get
def send_request():
response = get("http://localhost:5000")
return f"接收到响应:{response.text}"
async def main():
response = await send_request()
print(response)
asyncio.run(main())
协程嵌套
在一段协程中等待另一协程。
import asyncio
async def others():
print('start')
await asyncio.sleep(2) # 当前等待的sleep对象的返回值为None
print('end')
return '返回值'
async def func():
print('执行协程函数内部代码')
# 遇到阻塞操作,挂起该协程。当阻塞操作完成后再继续往下执行
# 当前协程挂起时,事件循环可以执行其他协程
response = await others() # others()是一个协程对象
print('结果:', response)
asyncio.run(func())
打印信息:
执行协程函数内部代码
start
end
IO操作结果为: 返回值
一个协程函数中可以同时存在多个 await
关键字。
import asyncio
async def others():
print('start')
await asyncio.sleep(2)
print('end')
return '返回值'
async def func():
print('执行协程函数内部代码')
response_1 = await others()
print('response_1的IO操作结果为:', response_1)
response_2 = await others()
print('response_2的IO操作结果为:', response_2)
asyncio.run(func())
打印结果:
执行协程函数内部代码
start
end
response_1的IO操作结果为: 返回值
start
end
response_2的IO操作结果为: 返回值
代码先执行func
中的print('执行协程函数内部代码')
,然后遇到await
,协程挂起。等待await
后面的程序全部执行完毕并返回结果。发现此处await
的是一个协程,程序进入协程others()
中,执行print('start')
,又遇到了await
,此次await
的是一个IO操作,则程序挂起,去执行其他的协程。(这里没有其他的协程,于是只能老实等待)。asyncio.sleep(2)
执行结束后,回到func()
里面继续执行挂起之后的代码。response_1
结束。response_2
同理。
task 对象
在 Python 3.13 中,创建 Task
对象的推荐方法是使用 asyncio.create_task()
函数。该函数可以将一个协程对象包装成一个 Task
对象,并将其调度到事件循环中执行。使用方法非常简单,只需要将协程对象传递给 asyncio.create_task()
,例如:asyncio.create_task(coroutine)
。这种方式可以确保任务在后台异步执行,而不会阻塞主线程。对于大多数场景,asyncio.create_task()
是最直接、最简洁的选择。需要注意的是,asyncio.run()
也会自动处理协程封装成 Task
的工作,因此如果是启动整个事件循环并运行一个单一的协程,使用 asyncio.run(coroutine)
即可,无需显式创建 Task
对象。在 Python 3.13 中,手动实例化 Task
对象不再推荐,asyncio.create_task()
是标准且优雅的解决方案。
Task 对象的属性和方法:
属性/方法 | 描述 |
---|---|
Task.cancel() | 取消任务。如果任务已开始执行,它会尽量中断任务并抛出 CancelledError 。 |
Task.cancelled() | 返回布尔值,表示任务是否已被取消。 |
Task.result() | 如果任务已成功执行并返回结果,则返回该结果。如果任务没有完成(如被取消或出现异常),会抛出 InvalidStateError 。 |
Task.exception() | 如果任务抛出了异常,返回该异常对象。如果任务未抛出异常,则返回 None 。 |
Task.done() | 返回布尔值,表示任务是否已完成(包括成功、异常或被取消)。 |
Task._loop | 返回任务所关联的事件循环(事件循环对象)。 |
Task._source_traceback | 如果任务抛出了异常,返回源头的 traceback(源代码跟踪)。 |
Task.get_stack() | 返回当前任务的调用堆栈列表。 |
Task._create() | 私有方法,用于创建任务。通常不直接调用,而是通过 asyncio.create_task() 或 asyncio.ensure_future() 来创建任务。 |
Task.__del__() | 当任务被销毁时调用的清理方法,通常不需要手动调用。 |
Task 对象具有如下生命周期:
任务创建:通过 asyncio.create_task()
或 asyncio.ensure_future()
创建任务。
任务运行:任务会在事件循环中异步运行。
任务取消:可以调用 cancel()
来请求取消任务,任务并非立即停止,而是标记为取消,接着可以抛出 CancelledError
异常。
任务完成:任务执行完毕后,可以通过 result()
或 exception()
获取任务结果或异常。
Task 对象有两种完成状态:
- done():如果任务完成,无论是正常结束还是通过异常结束,都返回
True
。 - cancelled():如果任务被取消,返回
True
。
import asyncio
async def others():
print('start')
await asyncio.sleep(2)
print('end')
return '返回值'
async def func():
print('执行协程函数内部代码')
# python3.7以上版本写法
task_1 = asyncio.create_task(others())
task_2 = asyncio.create_task(others())
response_1 = await task_1
print('task_1的IO操作结果为:', response_1)
response_2 = await task_2
print('task_2的IO操作结果为:', response_2)
asyncio.run(func())
打印结果:
执行协程函数内部代码
start
start
(隔两秒)
end
end
task_1的IO操作结果为: 返回值
task_2的IO操作结果为: 返回值
发现这里程序运行的结果和前面在await
后面加协程对象的时候不太一样。这是为什么呢?
主要原因是加了两个 asyncio.create_task()
语句,添加了两个task
对象。
程序执行的逻辑如下:
- 创建事件循环
asyncio.run()
- 运行主函数
func()
,执行print()
,创建task_1
,创建task_2
- 程序来到
response_1 = await task_1
,response_1
一直在等待await
后面的task_1
传回返回值。至此,由于遇到了await
,func()
主程序挂起. - 进入
task_1
中执行。在await
中执行print('start')
后,遇到await
阻塞,此时task_1
挂起,等待await
后面的程序执行完毕。与此同时,发现除了task_1
外还存在另一个任务task_2
,于是去执行task_2
里面的程序。 - 执行
task_2
中的print('start'
)后又遇到await
等待,但此时已经没有其他任务了,因此只能等待task_1
和task_2
中挂起的程序执行完毕。 task_1
先结束挂起,执行print('end')
,并return
返回值。谁先结束挂起,就先执行谁那边的后续代码- 在
task_1
执行完print('end')
之后,return
返回值之前,task_2
也结束挂起了。执行了print('end')
,return
返回值。这里虽然返回了值,但是response_2
只能等到await
的时候才能接收到这个值 - 函数返回至
func()
中,response_1
获得了await task_1
返回的值,执行了print()
response_2
获得了await task_2
的返回值,执行了print()
将程序改成如下的形式可以更加深刻理解程序执行的过程。
import asyncio
async def others(time, name):
print('start', name)
await asyncio.sleep(time)
print('end', name)
return '返回值' + name
async def func():
print('执行协程函数内部代码')
task_1 = asyncio.create_task(others(3, 'task_1'))
task_2 = asyncio.create_task(others(2, 'task_2'))
response_1 = await task_1
print('task_1的IO操作结果为:', response_1)
response_2 = await task_2
print('task_2的IO操作结果为:', response_2)
asyncio.run(func())
打印结果:
执行协程函数内部代码
start task_1
start task_2
(隔三秒)
end task_2
end task_1
task_1的IO操作结果为: 返回值task_1
task_2的IO操作结果为: 返回值task_2
倘若交换task_1
和task_2
的sleep
时间:
import asyncio
async def others(time, name):
print('start', name)
await asyncio.sleep(time)
print('end', name)
return '返回值' + name
async def func():
print('执行协程函数内部代码')
task_1 = asyncio.create_task(others(2, 'task_1'))
task_2 = asyncio.create_task(others(3, 'task_2'))
response_1 = await task_1
print('task_1的IO操作结果为:', response_1)
response_2 = await task_2
print('task_2的IO操作结果为:', response_2)
asyncio.run(func())
执行协程函数内部代码
start task_1
start task_2
(隔两秒)
end task_1
task_1的IO操作结果为: 返回值task_1
(隔一秒)
end task_2
task_2的IO操作结果为: 返回值task_2
如果不使用 Task,比如改为如下的程序:
import asyncio async def others(time, name): print('start', name) await asyncio.sleep(time) print('end', name) return '返回值' + name async def func(): print('执行协程函数内部代码') response_1 = await others(2, 'task_1') print('task_1的IO操作结果为:', response_1) response_2 = await others(3, 'task_2') print('task_2的IO操作结果为:', response_2) asyncio.run(func())
程序不会并行执行,这一点需注意。
协程并发
以上代码示例的代码写法一般用的很少,只是为了便于理解,正式写法如下: 使用asyncio.wait(task_list)
即可。
import time
import asyncio
# 使用async 来定义一个协程对象
async def do_work(x):
print('waiting:', x)
await asyncio.sleep(x) # 模拟一个IO操作
return 'Done after {}s'.format(x)
async def main():
# 创建多个协程对象
coroutine_1 = do_work(1)
coroutine_2 = do_work(2)
coroutine_3 = do_work(3)
tasks = [# 在列表中创建多个task对象
asyncio.create_task(coroutine_1), asyncio.create_task(coroutine_2),
asyncio.create_task(coroutine_3)]
# 使用 asyncio.wait 来等待任务完成,timeout=None 表示等待直到所有任务完成
dones, pendings = await asyncio.wait(tasks, timeout=None)
for task in dones:
print('task result:', task.result())
print("pendings:", pendings)
now = lambda: time.time()
start = now()
# 使用 asyncio.run 来启动协程和事件循环
asyncio.run(main())
print('Time:', now() - start)
打印结果:
waiting: 1
waiting: 2
waiting: 3
task result: Done after 1s
task result: Done after 2s
task result: Done after 3s
pendings: set()
Time: 3.002121925354004
我们可以在创建 task 时给 task 取名字。
coroutine_1 = do_work(1)
coroutine_2 = do_work(2)
coroutine_3 = do_work(3)
tasks = [
asyncio.create_task(coroutine_1, name='MY_COROUTINE_1'),
asyncio.create_task(coroutine_2, name='MY_COROUTINE_2'),
asyncio.create_task(coroutine_3, name='MY_COROUTINE_3')
]
future 对象
future
类是task
类的基类,task
对象只有运算得到返回值后,await
的对象才能传回值并且向下运行。这个功能就是future
对象来实现的。future
源码中存在一个_state
,一旦_state
值变成finished
,await
就不再继续等待。future
一般不手动去写它。
若执行以下代码,则程序一直会处于等待状态。(卡死)
import asyncio
async def main():
# 创建一个任务(future对象),什么也不干
fut = asyncio.Future() # 创建一个 Future 对象
# await会等待任务最终结果,没有结果则会一直等待下去
await fut
asyncio.run(main())
import asyncio
async def set_after(fut):
await asyncio.sleep(2)
fut.set_result('666')
async def main():
# 创建一个任务(future对象),什么也不干
fut = asyncio.Future() # 创建 Future 对象
# 等待任务最终结果,没有结果则会一直等待下去
# 手动设置future的最终结果,那样fut就可以结束了
await asyncio.create_task(set_after(fut)) # 使用 create_task 来创建任务并调度执行
data = await fut # 等待future对象获取最终结果,否则一直等待下去
print(data)
# 使用 asyncio.run() 来执行协程,避免手动管理事件循环
asyncio.run(main())
以上两段代码没有实际意义,只是加深对于future
的理解。
concurrent 中的 future 对象
在python
中还有一个concurrent
模块,concurrent
模块也有对应的java
接口。concurrent.future.Future
对象和asyncio.Future
对象没有任何关系,是完全不同的两个东西。concurrent.futures.Future
是在利用进程池、线程池来实现异步操作时来使用的对象。
import time
from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor # 用线程执行异步
from concurrent.futures.process import ProcessPoolExecutor # 用进程执行异步
def func(value):
time.sleep(1)
print(value)
return 123
# 创建线程池
pool = ThreadPoolExecutor(max_workers=2)
# 或者创建进程池
# pool = ProcessPoolExecutor(max_workers=5)
for i in range(4):
fut = pool.submit(func, i) # 返回的是一个Future对象, fut.result()才是返回结果
print(fut)
# print(fut.result())
打印结果:
<Future at 0x7f02f1bf2850 state=running>
<Future at 0x7f02f1a1b3a0 state=running>
<Future at 0x7f02f1a1b700 state=pending>
<Future at 0x7f02f1a1b820 state=pending>
1
0
3
2
这边可以发现fut对象是并行创建的。
协程与线程和进程的交叉使用
日后写代码可能会存在协程和线程、进程下的两种Future
交叉使用的情况。 (在编程的时候,遇到某个第三方模块不支持协程异步的时候用) 一般情况下,不会交叉使用。在并发编程中,要么用asyncio
这种协程式的来实现异步,要么统一用进程池或线程池来实现异步。有些情况下会交叉使用,例如:crm
项目中80%是基于协程异步编程 + MySQL
,这种情况下只有MySQL
内部也支持async
异步,两者才能实现无缝衔接。假如MySQL
不支持协程,则需要考虑对此用线程、进程来实现了。
asyncio.run_in_executor()
实现异步的内部原理:
- 第一步:内部会先调用
TreadPoolExector
的submit
方法去线程中申请一个func_1
函数,并返回一个concurrent.futures.Future
对象 (与asyncio
没有任何关系) - 第二步:调用
asyncio.wrap_future
将concurrent.futures.Future
对象包装成asyncio.Future
对象
因为concurrent.futures.Future
对象不支持await
语法,因此需要转换成asyncio.Future
对象才能进行使用
import time
import asyncio
import concurrent.futures
def func_1():
time.sleep(2)
return '这是一个任务的返回值...'
async def main():
loop = asyncio.get_running_loop()
# 在协程函数中运行普通函数 在执行函数时,协程内部会自动创建一个线程池来运行任务
# run_in_executor()方法第一个参数为None时则默认创建一个线程池
fut = loop.run_in_executor(None, func_1)
result = await fut
print('当前方式会自动创建一个线程池去执行普通函数: ', result)
# 在协程函数中运行基于线程池的任务, 效果与以上代码一致
with concurrent.futures.ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(pool, func_1)
print('在线程池中得到的执行结果: ', result)
# 在协程函数中运行基于进程池的任务
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, func_1)
print('在进程池中得到的执行结果: ', result)
if __name__ == "__main__":
asyncio.run(main())
异步与非异步模块混合使用案例
import asyncio
import requests
# 异步下载图片函数
async def download_image(url):
# 发送网络请求下载图片
print('开始下载:', url)
# request不支持异步,使用线程池来辅助实现
loop = asyncio.get_running_loop()
future = loop.run_in_executor(None, requests.get, url)
response = await future
print('下载完成')
# 图片保存
file_name = url.rsplit('/')[-1]
with open(file_name, 'wb') as f:
f.write(response.content)
# 主程序
async def main():
url_list = [
'http://pic.bizhi360.com/bbpic/98/10798.jpg',
'http://pic.bizhi360.com/bbpic/92/10792.jpg',
'http://pic.bizhi360.com/bbpic/86/10386.jpg'
]
# 使用 asyncio.create_task 来创建协程任务
tasks = [asyncio.create_task(download_image(url)) for url in url_list]
# 等待所有任务完成
await asyncio.gather(*tasks)
# 使用 asyncio.run 来启动整个程序
if __name__ == '__main__':
asyncio.run(main())
启动程序:
- 程序从
asyncio.run(main())
开始执行,asyncio.run()
会创建一个事件循环并开始执行main()
协程。
创建协程任务:
main()
函数中定义了一个图片 URL 列表 (url_list
)。然后通过列表推导式
[asyncio.create_task(download_image(url)) for url in url_list]
创建了 3 个异步任务,每个任务负责下载一个图片。asyncio.create_task(download_image(url))
会立刻将任务添加到事件循环中并开始执行相应的协程函数download_image(url)
。
download_image(url)
函数的执行:
- 每个
download_image(url)
调用开始执行时,首先会打印'开始下载: <url>'
,然后执行requests.get(url)
来发送 HTTP 请求下载图片。 - 由于
requests
库是同步的,而我们在异步代码中不能直接使用同步的阻塞 I/O 操作,因此通过loop.run_in_executor(None, requests.get, url)
将 HTTP 请求操作放入一个线程池中运行。这使得requests.get()
可以在不同的线程中执行,而不会阻塞主线程。 - 在调用
await future
时,协程会挂起,等待requests.get()
的执行结果(即图片下载完成),此时控制权会返回到事件循环,让其他任务得以执行。
并发执行:
- 因为每个下载操作都通过
asyncio.create_task()
创建了任务,并且任务的执行是并发的,所以多个下载操作可以同时进行。 - 当一个任务处于
await
阶段(比如等待requests.get()
返回),事件循环可以继续执行其他任务。也就是说,当第一个任务在等待下载时,第二个和第三个任务也会继续执行,下载操作不会互相阻塞。
下载完成与保存:
- 当
requests.get(url)
下载完成并返回响应时,future.set_result(response)
会使await future
恢复执行,协程会继续执行后面的代码。 - 每个任务都会打印
'下载完成'
,然后从响应中获取图片内容 (response.content
) 并保存到本地文件中,文件名根据 URL 的最后一部分确定。
等待任务完成:
await asyncio.gather(*tasks)
等待所有的协程任务完成。asyncio.gather()
会并行执行所有传入的协程任务,直到所有任务都结束。asyncio.gather()
返回一个结果列表,这里我们没有处理返回值,但你可以根据需要访问任务的返回值。
程序结束:
- 当所有任务都完成后,
asyncio.run(main())
结束执行,程序退出。
异步迭代器
在 Python 中,异步迭代器是一个支持异步遍历的对象,可以用来处理需要异步操作的迭代场景。与普通的同步迭代器不同,异步迭代器中的数据获取通常会涉及到异步操作(如 I/O 操作),因此迭代过程本身是异步的。
异步迭代器的基本概念
一个异步迭代器需要实现以下两个方法:
aiter(): 返回一个异步迭代器对象。这个方法通常返回 self。
anext(): 这是异步迭代器的核心方法,它用于获取下一个值。它返回一个协程对象,因此需要用 await 来等待结果。如果没有更多的元素,anext() 会抛出 StopAsyncIteration 异常,表示迭代已经完成。
异步迭代器与同步迭代器的对比
同步迭代器:同步迭代器会通过 iter() 和 next() 方法来返回值,并且每次 next() 调用时会直接返回结果。
异步迭代器:异步迭代器需要通过 aiter() 和 anext() 方法来实现,其中 anext() 会返回一个协程对象,协程对象通过 await 等待结果。
示例:实现异步迭代器
假设我们需要模拟从数据库或网络中异步读取数据,我们可以通过异步迭代器来按需加载数据。
import asyncio
class AsyncCounter:
def __init__(self, start, end):
self.current = start
self.end = end
# 返回迭代器本身
def __aiter__(self):
return self
# 获取下一个值,异步获取
async def __anext__(self):
if self.current > self.end:
raise StopAsyncIteration # 结束迭代
await asyncio.sleep(1) # 模拟异步操作(比如从数据库中读取)
self.current += 1
return self.current - 1
# 异步遍历数据
async def main():
async for number in AsyncCounter(1, 5):
print(f"Received number: {number}")
# 运行异步任务
asyncio.run(main())
解释:
AsyncCounter 类:
该类实现了一个异步迭代器,从 start 到 end 依次返回数字。
aiter() 方法返回迭代器本身,即 self。
anext() 方法返回下一个数字,并且模拟一个异步操作(通过 await asyncio.sleep(1) 模拟延迟)。
当 self.current 超过 end 时,通过 raise StopAsyncIteration 来终止迭代。
async for:
在 main() 函数中,使用 async for 循环来异步地遍历 AsyncCounter 实例。当调用 anext() 时,Python 会自动等待结果,并且在协程中执行异步操作。
运行:
asyncio.run(main()) 启动事件循环并运行 main() 协程。
示例:使用异步迭代器读取文件
假设我们有一个大文件,想要按行异步读取文件内容,可以使用异步迭代器来实现。
import asyncio
class AsyncFileReader:
def __init__(self, filename):
self.filename = filename
def __aiter__(self):
self.file = open(self.filename, 'r')
return self
async def __anext__(self):
line = await asyncio.to_thread(self.file.readline)
if not line:
self.file.close()
raise StopAsyncIteration
return line.strip()
async def main():
async for line in AsyncFileReader('sample.txt'):
print(f"读取到的行:{line}")
asyncio.run(main())
解释:
AsyncFileReader 类:
该类实现了一个异步迭代器,用于异步按行读取文件。
在 aiter() 中,打开文件并返回迭代器本身。
在 anext() 中,使用 asyncio.to_thread() 将文件读取操作放入线程池中,以避免阻塞事件循环。
asyncio.to_thread() 是一个协程,它将同步的阻塞操作(如文件读取)转移到后台线程进行异步执行。
async for:
async for 循环用于异步遍历文件的每一行。anext() 会异步读取每一行,直到文件末尾。
异步迭代器常见场景
异步读取文件或数据库:当需要从文件或数据库中异步获取数据时,异步迭代器可以避免阻塞事件循环,允许程序执行其他任务。
网络请求:异步迭代器可用于按需从网络下载数据,而无需阻塞主线程。
异步任务管理:在处理多个异步任务时,异步迭代器可以按顺序返回每个任务的结果,特别适合需要分批处理的场景。
绑定回调
在task
执行完毕的时候可以获取执行的结果,回调的最后一个参数是future
对象,通过该对象可以获取协程返回值,如果回调需要多个参数,可以通过偏函数导入。
import asyncio
# 使用async 来定义一个协程对象
async def do_work(x):
print('waiting:', x)
await asyncio.sleep(x)
return 'Done after {} s'.format(x)
# 给任务添加回调函数
def callback(future):
print('callback:', future.result())
async def run_task():
# 获取协程对象
coroutine = do_work(3)
# 创建task
task = asyncio.create_task(coroutine)
# 给task添加回调
task.add_done_callback(callback)
return await task
# 将协程对象加入到事件循环中
asyncio.run(run_task()) # 注意这里加上了括号,调用异步任务
回调一直是很多异步编程的噩梦,程序员更喜欢用同步的编写方式写异步代码,以避免回调的问题。回调中我们使用了future
的result()
方法,前面不绑定回调的例子中,可以看到task
的finished
状态.在那个时候,可以直接读取task
的result
方法。
import asyncio
# 使用async 来定义一个协程对象
async def do_work(x):
print('waiting:', x)
await asyncio.sleep(x)
return f'Done after {x} s'
# 获取协程对象
async def main():
coroutine = do_work(3)
# 创建task
task = asyncio.create_task(coroutine)
# 等待任务完成
result = await task
print('直接获取返回结果:', result)
# 使用asyncio.run来启动事件循环
asyncio.run(main())
阻塞与 await
使用async
可以定义协程对象,使用await
可以针对耗时的操作进行挂起,就像生成器中的yield
一样,函数交出控制权。协程遇到await
,事件循环就会挂起该协程,执行别的协程,直到其他的协程也挂起或者执行完毕,再执行下一个协程。
import time
import asyncio
# 使用async 来定义一个协程对象
async def do_work(x):
print('waiting:', x)
await asyncio.sleep(x) # 模拟一个IO操作
return f'Done after {x} s'
async def main():
now = lambda: time.time()
start = now()
# 获取协程对象
coroutine = do_work(3)
# 创建task
task = asyncio.create_task(coroutine)
# 将协程对象加入到事件循环中
await task
# 下面的程序会阻塞住,直到协程执行完毕
print('task result:', task.result())
print('Time:', now() - start)
asyncio.run(main())
打印结果:
waiting: 3
task result: Done after 3s
Time: 3.002943754196167
并发执行与同步执行
对创建的future
对象进行变量赋值并进行await
。
import time
import asyncio
now = lambda: time.time()
async def func():
task_1 = asyncio.ensure_future(asyncio.sleep(1))
task_2 = asyncio.ensure_future(asyncio.sleep(1))
await task_1
await task_2
async def main():
start = now()
for i in range(5):
await func()
print('异步所花费的时间: %f s' % (now() - start))
asyncio.run(main())
打印结果:
异步所花费的时间: 5.005247 s
直接运行future
对象:
import time
import asyncio
now = lambda: time.time()
async def func():
await asyncio.ensure_future(asyncio.sleep(1))
await asyncio.ensure_future(asyncio.sleep(1))
async def main():
start = now()
for i in range(5):
await func()
print('异步所花费的时间: %f s' % (now() - start))
asyncio.run(main())
打印结果:
异步所花费的时间: 10.010090 s
直接对asyncio.ensure_future()
创建的future
对象进行await
是不能实现并发的,必须将创建的对象返回给一个变量,在对该变量绑定的future
对象进行await
才可实现。更好的方法是使用asyncio.gather()
方法或者aysncio.wait()
方法来实现并发
异步上下文管理器
这种对象通过定义__aenter__()
和__aexit__()
方法来对asyncio with
语句中的环境进行控制,叫做异步上下文管理器。
import asyncio
class AsyncContextManager:
async def do_something(self):
return '模拟业务操作'
async def __aenter__(self):
print("进入上下文管理器")
await asyncio.sleep(1)
return self
async def __aexit__(self, exec_type, exc_val, exc_tb):
"""
with语句运行结束之后触发此方法的运行
exc_type:如果抛出异常, 这里获取异常类型
exc_val:如果抛出异常, 这里显示异常内容
exc_tb:如果抛出异常, 这里显示所在位置, traceback
"""
print("退出上下文管理器")
await asyncio.sleep(1)
async def main():
async with AsyncContextManager() as fp:
result = await fp.do_something()
print(result)
asyncio.run(main())
实际使用
异步 HTTP 请求
asyncio
可以与 aiohttp
配合使用,进行异步 HTTP 请求。相比于传统的阻塞请求,aiohttp
提供了非阻塞的网络通信,能够显著提高并发请求的性能。
import aiohttp
import asyncio
async def fetch(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
async def main():
url = "http://example.com"
html = await fetch(url)
print(html)
asyncio.run(main())
异步MySQL
# pip install aiomysql
import asyncio
import aiomysql
async def execute():
# 网络IO操作,连接mysql
conn = await aiomysql.connect(host='127.0.0.1', port=3306, user='root', password='root', db='test')
# 网络IO操作,创建cursor()
cur = await conn.cursor()
# 网络IO操作,执行sql语句
await cur.execute('SELECT host.user FROM user')
# 网络IO操作,获取sql结果
result = cur.fetchall()
print(result)
# 网络IO操作,关闭链接
await cur.close()
conn.close()
asyncio.run(execute())
import asyncio
import aiomysql
async def execute(host, password):
# 网络IO操作,连接mysql
conn = await aiomysql.connect(host=host, port=3306, user='root', password=password, db='test')
# 网络IO操作,创建cursor()
cur = await conn.cursor()
# 网络IO操作,执行sql语句
await cur.execute('SELECT host.user FROM user')
# 网络IO操作,获取sql结果
result = cur.fetchall()
print(result)
# 网络IO操作,关闭链接
await cur.close()
conn.close()
async def main():
task_list = [
asyncio.ensure_future(execute('127.0.0.1', 'root')),
asyncio.ensure_future(execute('40.0.0.1', 'root'))
]
await asyncio.wait(task_list)
asyncio.run(execute())
补充
asyncio 模块的常用方法和类
方法/类 | 功能描述 |
---|---|
asyncio.run(coroutine) | 用于运行最高层级的协程,创建事件循环并执行协程,直到其完成。 |
asyncio.create_task(coroutine) | 创建一个任务(Task ),该任务表示一个待执行的协程,任务会被调度到事件循环中运行。 |
asyncio.ensure_future(coroutine) | 将协程转换为 Future 对象并将其调度到事件循环,功能与 create_task 类似,但允许传递未来的 Future 对象。 |
asyncio.gather(*coroutines) | 并行运行多个协程并收集其结果。返回一个 Future 对象,可以用 await 等待多个协程的执行结果。 |
asyncio.wait(futures, timeout=None) | 等待多个 Future 对象中的某些完成,返回一个元组,包括完成的任务和未完成的任务。 |
asyncio.sleep(seconds) | 异步休眠,非阻塞,等待指定的时间(秒)。该方法通常用于模拟 I/O 操作或延时。 |
asyncio.get_event_loop() | 获取当前线程的事件循环对象。该方法在 Python 3.10 之前是常用的,建议使用 asyncio.get_running_loop() 。 |
asyncio.get_running_loop() | 获取当前运行中的事件循环,通常用于协程中。 |
asyncio.run_until_complete(future) | 在事件循环中运行直到 Future 对象完成,常用于 get_event_loop() 中。 |
asyncio.to_thread(func, *args) | 将阻塞性函数放入线程池中异步执行,适用于不支持异步的 I/O 操作(如数据库查询、文件读写)。 |
asyncio.CancelledError | 当任务被取消时抛出的异常。 |
asyncio.Future | Future 对象表示尚未完成的异步操作,它是一个低级别的对象,可以显式控制任务的执行结果。 |
asyncio.Task | Task 类继承自 Future ,表示一个正在执行的协程,允许通过 await 获取其结果。 |
asyncio.Event | 用于在协程之间同步的事件对象。可以通过 set() 或 clear() 来控制事件的状态,并在其他协程中通过 wait() 等待事件的发生。 |
asyncio.Queue | 异步队列对象,允许多个生产者和消费者协作,提供了 put() 和 get() 方法,用于在异步环境下的任务调度和同步。 |
asyncio.Lock | 异步锁,用于在多个协程之间控制资源的访问。不同于同步锁,它不会阻塞事件循环。 |
asyncio.Semaphore | 异步信号量,限制同时访问某些资源的协程数量,类似于锁,但允许指定同时执行的最大任务数量。 |
asyncio.Condition | 异步条件变量,用于在协程之间同步操作,通常与 Lock 或 Semaphore 一起使用,以确保条件满足时才继续执行。 |
asyncio.StreamReader | 用于处理异步读取流的类,通常与 StreamWriter 一起使用,用于异步处理流式数据(如网络流)。 |
asyncio.StreamWriter | 用于处理异步写入流的类,通常与 StreamReader 一起使用,用于异步写入流式数据(如网络流)。 |
asyncio.SubprocessProtocol | 用于管理和控制子进程的协议类,能够与子进程进行异步交互。 |
asyncio.SubprocessStreamReader | 用于从子进程的标准输出或标准错误流中异步读取数据。 |
asyncio.SubprocessStreamWriter | 用于向子进程的标准输入流中异步写入数据。 |
asyncio.shield(coroutine) | 用于防止任务在执行过程中被取消。 |