跳至主要內容

Python异步编程

大约 20 分钟约 6030 字

协程概念

在同一线程内,一段执行代码过程中,可以中断并跳转到另一段代码中,接着之前中断的地方继续执行。

协程运行状态于多线程类似。

协程优点:

  • 无需线程上下文切换,协程避免了无意义的调度,可以提高性能。
  • 无需原子操作锁定及同步开销。
  • 方便切换控制流,简化编程模型
  • 高并发 + 高扩展性 + 低成本,一个CPU支持上万的协程不是问题,很适合用于高并发处理。

协程缺点:

  • 无法利用多核资源。协程的本质是单线程,不能同时将单个CPU的多个核用上,协程需要进程配合才能运行在多CPU上。(不过我们日常编程不会有这个问题,除非是CPU密集型应用)
  • 进行阻塞操作(如IO时)会阻塞掉整个程序

对生成器的回顾

生成器示例代码

def fun():
    print('---------start---------')
    while True:
        yield '生成器返回的数据....'


g = fun()
print(next(g))
print('*' * 20)
print(next(g))

打印结果:

---------start---------
生成器返回的数据....
********************
生成器返回的数据....

由以上代码得知生成器在第二个运行的时候会根据上一次运行的状态继续向下执行。利用这一个特性我们可以使用生成器进行任务之间的切换。

使用yield实现协程

import time


def func_a():
    while True:
        print('-------A-------')
        yield
        time.sleep(0.5)


def func_b(obj):
    while True:
        print('---------B------------')
        obj.__next__()


a = func_a()
func_b(a)

打印结果:

---------B------------
-------A-------
---------B------------
-------A-------
---------B------------
-------A-------
---------B------------

当前程序不断交替执行,完成了程序之间的切换。

异步IO

  • 同步:先执行第一个事务,如果遇到阻塞,则进行等待直到第一个事务执行完毕,再执行第二个事务。
import time


def foo():
    time.sleep(1)


now = lambda: time.time()
start = now()

for i in range(5):
    foo()

print('同步所花费的时间: %f s' % (now() - start))

打印结果:

同步所花费的时间: 5.004482 s
  • 异步:执行第一个事务之后,如果遇到阻塞,则会执行第二个事务,不会等待。可以通过状态、通知、回调来调用处理结果。
import time
import asyncio

now = lambda: time.time()


async def fun():
    asyncio.sleep(1)
    print('我是async任务...')


start = now()

for _ in range(5):
    asyncio.run(fun())

print('异步所花费的时间: %f s' % (now() - start))

打印结果:

我是async任务...
我是async任务...
我是async任务...
我是async任务...
我是async任务...
异步所花费的时间: 0.003563 s

asyncio

asynciopython3.4之后引入的标准库,内置对异步IO的支持。asyncio的编程模型是一个消息循环,我们从asyncio模块中直接获取一个EventLoop的引用,然后把需要执行的协程扔到EventLoop中执行,就实现了异步IO。

  1. event_loop事件循环:程序开启一个无限的循环,程序员会把一些函数注册到事件循环上,当满足事件发生的时候,调用相应的协程函数。
  2. coroutine协程:协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。
  3. task任务:一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态。
  4. future: 代表将来执行或者没有执行的任务的结果,它和task没有本质上的区别
  5. async / await关键字:python3.5用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口。

事件循环

理解成一个死循环,会去检测某些代码。

# 伪代码

任务列表 =[任务1、任务2....]

while True:
  可执行的任务列表, 执行完成的任务列表 = 去任务列表中检查所有的任务, 将“可执行”和“已完成”的任务返回
  
  for 就绪任务 in 可执行任务列表:
    执行就绪任务
  for 已完成的任务 in 执行完成的任务列表:
    任务列表中移除 已完成的任务
  
  如果 任务列表 中任务都完成,则终止循环

创建事件循环

import asyncio


# 异步任务1
async def work_1():
    for _ in range(5):
        print('我是异步任务1')
        await asyncio.sleep(1)


# 异步任务2
async def work_2():
    for _ in range(5):
        print('我是异步任务2')
        await asyncio.sleep(1)


# 将多个任务存放在一个列表中
tasks = [
    work_1(),
    work_2()
]

# 创建或获取一个事件循环
loop = asyncio.get_event_loop()

# 将任务列表放到事件循环
loop.run_until_complete(asyncio.wait(tasks))

"""
当前代码在python3.7后被asyncio.run替代。无需手动创建事件循环
"""

快速上手

协程函数:定义函数的时候前面有async关键字

协程对象:执行协程函数()得到的是协程对象

创建协程

async def do_work():
    print('学习协程中...')


result = do_work()

以上代码会获取一个协程对象。内部代码不会被运行

使用事件循环运行协程对象

loop = asyncio.get_event_loop()
loop.run_until_complete(result)

以上代码为python3.6的写法,在python3.7后使用run()方法来执行协程对象

asyncio.run(result)

await关键字

await后面跟的是一个可等待的对象。await的作用就是等待后面的函数返回结果,只有后面的函数返回结果了,程序才会继续往下执行。

可等待的对象:

  • 协程对象
  • future对象
  • task对象

await在等待以上对象所返回的值。

协程嵌套

在一个协程函数中执行其他的协程函数

import asyncio


async def others():
    print('start')
    await asyncio.sleep(2)  # 当前等待的sleep对象的返回值为None
    print('end')
    return '返回值'


async def func():
    print('执行协程函数内部代码')
    # 遇到IO操作,挂起该协程。当IO操作完成后再继续往下执行
    # 当前协程挂起时,事件循环可以执行其他协程
    response = await others()  # others()是一个协程对象
    print('IO操作结果为:', response)


# python3.6的写法
# loop=asyncio.get_event_loop()
# loop.run_until_complete(func())

# python3.7以上版本的写法
asyncio.run(func())

打印信息:

执行协程函数内部代码
start
end
IO操作结果为: 返回值

一个协程函数中可以同时存在多个await

import asyncio


async def others():
    print('start')
    await asyncio.sleep(2)
    print('end')
    return '返回值'


async def func():
    print('执行协程函数内部代码')
    # 遇到IO操作,挂起该协程。当IO操作完成后再继续往下执行
    # 当前协程挂起时,事件循环可以执行其他协程
    response_1 = await others()
    print('response_1的IO操作结果为:', response_1)

    response_2 = await others()
    print('response_2的IO操作结果为:', response_2)


# python3.6的写法
loop = asyncio.get_event_loop()
loop.run_until_complete(func())

# python3.7及以上写法
# asyncio.run(func())

打印结果:

执行协程函数内部代码
start
end
response_1的IO操作结果为: 返回值
start
end
response_2的IO操作结果为: 返回值

代码先执行func中的print('执行协程函数内部代码'),然后遇到await,协程挂起。等待await后面的程序全部执行完毕并返回结果。发现此处await的是一个协程,程序进入协程others() 中,执行print('start'),又遇到了await,此次await的是一个IO操作,则程序挂起,去执行其他的协程。(这里没有其他的协程,于是只能老实等待)。asyncio.sleep(2)执行结束后,回到func()里面继续执行挂起之后的代码。response_1结束。response_2同理。

task对象

创建一个task对象

task对象就是往事件循环中加入任务用的。Task用于开发调度协程,通过asyncio.create_task(协程对象)创建(python3.7之后有这个函数),也可以用**asyncio.ensure_future(coroutine)loop.create_task(coroutine)创建一个task**。run_until_complete的参数是一个future对象,当传入一个协程,其内部会自动封装成task。不建议手动实例化task对象。

import asyncio


async def others():
    print('start')
    await asyncio.sleep(2)
    print('end')
    return '返回值'


async def func():
    print('执行协程函数内部代码')

    # python3.6写法
    # task_1 = asyncio.ensure_future(others())
    # task_2 = asyncio.ensure_future(others())

    # python3.7以上版本写法
    task_1 = asyncio.create_task(others())
    task_2 = asyncio.create_task(others())

    response_1 = await task_1
    print('task_1的IO操作结果为:', response_1)
    response_2 = await task_2
    print('task_2的IO操作结果为:', response_2)


# python3.6写法
# loop = asyncio.get_event_loop()
# loop.run_until_complete(func())

# python3.7以上版本写法
asyncio.run(func())

打印结果:

执行协程函数内部代码
start
start
end
end
task_1的IO操作结果为: 返回值
task_2的IO操作结果为: 返回值

发现这里程序运行的结果和前面在await后面加协程对象的时候不太一样。这是为什么呢?

主要原因是加了两个 asyncio.create_task()语句,添加了两个task对象。

  • 程序执行的逻辑如下:
    • 创建事件循环asyncio.get_event_loop()
    • 运行主函数func(),执行print(),创建task_1,创建task_2
    • 程序来到response_1 = await task_1response_1一直在等待await后面的task_1传回返回值。至此,由于遇到了awaitfunc()主程序挂起.
    • 进入task_1中执行。在await中执行print('start')后,遇到await阻塞,此时task_1挂起,等待await后面的程序执行完毕。与此同时,发现除了task_1外还存在另一个任务task_2,于是去执行task_2里面的程序。
    • 执行task_2中的print('start')后又遇到await等待,但此时已经没有其他任务了,因此只能等待task_1task_2中挂起的程序执行完毕。
    • task_1先结束挂起,执行print('end'),并return返回值。谁先结束挂起,就先执行谁那边的后续代码
    • task_1执行完print('end')之后,return返回值之前,task_2也结束挂起了。执行了print('end')return返回值。这里虽然返回了值,但是response_2只能等到await的时候才能接收到这个值
    • 函数返回至func()中,response_1获得了await task_1返回的值,执行了print()
    • response_2获得了await task_2的返回值,执行了print()

将程序改成如下的形式可以更加深刻理解程序执行的过程

import asyncio


async def others(time, name):
    print('start', name)
    await asyncio.sleep(time)
    print('end', name)
    return '返回值' + name


async def func():
    print('执行协程函数内部代码')

    task_1 = asyncio.ensure_future(others(3, 'task_1'))
    task_2 = asyncio.ensure_future(others(2, 'task_2'))
    response_1 = await task_1
    print('task_1的IO操作结果为:', response_1)
    response_2 = await task_2
    print('task_2的IO操作结果为:', response_2)


loop = asyncio.get_event_loop()
loop.run_until_complete(func())

"""
随堂测试:
    将以上代码修改成python3.7以上版本的写法
"""

打印结果:

执行协程函数内部代码
start task_1
start task_2
end task_2  # 这边task_2比task_1先结束,这里之所以不会先print('task_2的IO操作结果为:', response_2)是因为func()函数需要先等response_1结束挂起才能执行后续。
end task_1
task_1的IO操作结果为: 返回值task_1
task_2的IO操作结果为: 返回值task_2

倘若交换task_1task_2sleep时间

import asyncio  

async def others(time, name):
  print('start', name)
  await asyncio.sleep(time)
  print('end', name)
  return '返回值' + name

async def func():
  print('执行协程函数内部代码')

  task_1 = asyncio.ensure_future(others(2, 'task_1')) 
  task_2 = asyncio.ensure_future(others(3, 'task_2'))
  response_1 = await task_1
  print('task_1的IO操作结果为:', response_1)
  response_2 = await task_2
  print('task_2的IO操作结果为:', response_2)

loop = asyncio.get_event_loop()
loop.run_until_complete(func())
执行协程函数内部代码
start task_1
start task_2
end task_1
task_1的IO操作结果为: 返回值task_1
end task_2
task_2的IO操作结果为: 返回值task_2

协程并发

以上代码示例的代码写法一般用的很少,只是为了便于理解,正式写法如下: 使用asyncio.wait(task_list)即可。

import time
import asyncio


# 使用async 来定义一个协程对象
async def do_work(x):
    print('waiting:', x)
    await asyncio.sleep(x)  # 模拟一个IO操作
    return 'Done after {}s'.format(x)


async def main():
    # 创建多个协程对象
    coroutine_1 = do_work(1)
    coroutine_2 = do_work(2)
    coroutine_3 = do_work(3)

    tasks = [
        # 在列表中创建多个future对象
        asyncio.ensure_future(coroutine_1),
        asyncio.ensure_future(coroutine_2),
        asyncio.ensure_future(coroutine_3)
    ]

    # 用gather()方法也行
    # time_out=2 表示最多等2秒,假如time_out=None,则表示默认等到所有结果都完成
    dones, pendings = await asyncio.wait(tasks, timeout=None)

    for task in dones:
        print('task result:', task.result())


now = lambda: time.time()
start = now()
# 创建事件循环
loop = asyncio.get_event_loop()
# 将协程对象加入到事件循环中
loop.run_until_complete(main())

print('Time:', now() - start)

打印结果:

waiting: 1
waiting: 2
waiting: 3
task result: Done after 1s
task result: Done after 2s
task result: Done after 3s
Time: 3.0020153522491455

python 3.7之后我们可以在创建task的时候给task起名字

coroutine_1 = do_work(1)
coroutine_2 = do_work(2)
coroutine_3 = do_work(3)

tasks = [
   asyncio.create_task(coroutine_1, name='MY_COROUTINE_1'),
   asyncio.create_task(coroutine_2, name='MY_COROUTINE_2'),
   asyncio.create_task(coroutine_3, name='MY_COROUTINE_3')
]

asynciofuture对象

future类是task类的基类,task对象只有运算得到返回值后,await的对象才能传回值并且向下运行。这个功能就是future对象来实现的。future源码中存在一个_state,一旦_state值变成finishedawait就不再继续等待。future一般不手动去写它。

若执行以下代码,则程序一直会处于等待状态。(卡死)

import asyncio  

async def main():
   # 获取当前事件循环
   loop = asyncio.get_running_loop() # python3.7以后
   # 创建一个任务(future对象),什么也不干
   fut = loop.create_future()
   # await会等待任务最终结果,没有结果则会一直等待下去
   await fut

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
import asyncio


async def set_after(fut):
    await asyncio.sleep(2)
    fut.set_result('666')


async def main():
    # 获取当前事件循环
    loop = asyncio.get_running_loop()  # python3.7以后
    # 创建一个任务(future对象),什么也不干
    fut = loop.create_future()
    # 等待任务最终结果,没有结果则会一直等待下去
    # 手动设置future的最终结果,那样fut就可以结束了
    await loop.create_task(set_after(fut))
    data = await fut  # 等待future对象获取最终结果,否则一直等待下去
    print(data)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

以上两段代码没有实际意义,只是加深对于future的理解。

concurrent中的future对象

python中还有一个concurrent模块,concurrent模块也有对应的java接口。concurrent.future.Future对象和asyncio.Future对象没有任何关系,是完全不同的两个东西。concurrent.futures.Future是在利用进程池、线程池来实现异步操作时来使用的对象。

import time
from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor  # 用线程执行异步
from concurrent.futures.process import ProcessPoolExecutor  # 用进程执行异步


def func(value):
    time.sleep(1)
    print(value)
    return 123


# 创建线程池
pool = ThreadPoolExecutor(max_workers=2)
# 或者创建进程池
# pool =  ProcessPoolExecutor(max_workers=5)

for i in range(4):
    fut = pool.submit(func, i)  # 返回的是一个Future对象, fut.result()才是返回结果
    print(fut)
    # print(fut.result())

打印结果:

<Future at 0x7f02f1bf2850 state=running>
<Future at 0x7f02f1a1b3a0 state=running>
<Future at 0x7f02f1a1b700 state=pending>
<Future at 0x7f02f1a1b820 state=pending>
1
0
3
2

这边可以发现fut对象是并行创建的。

协程与线程和进程的交叉使用

日后写代码可能会存在协程和线程、进程下的两种Future交叉使用的情况。 (在编程的时候,遇到某个第三方模块不支持协程异步的时候用) 一般情况下,不会交叉使用。在并发编程中,要么用asyncio这种协程式的来实现异步,要么统一用进程池或线程池来实现异步。有些情况下会交叉使用,例如:crm项目中80%是基于协程异步编程 + MySQL,这种情况下只有MySQL内部也支持async异步,两者才能实现无缝衔接。假如MySQL不支持协程,则需要考虑对此用线程、进程来实现了。

  • asyncio.run_in_executor()实现异步的内部原理
    • 第一步:内部会先调用TreadPoolExectorsubmit方法去线程中申请一个func_1函数,并返回一个concurrent.futures.Future对象 (与asyncio没有任何关系)
    • 第二步:调用asyncio.wrap_futureconcurrent.futures.Future对象包装成asyncio.Future对象

因为concurrent.futures.Future对象不支持await语法,因此需要转换成asyncio.Future对象才能进行使用

import time
import asyncio
import concurrent.futures


def func_1():
    time.sleep(2)
    return '这是一个任务的返回值...'


async def main():
    loop = asyncio.get_running_loop()

    # 在协程函数中运行普通函数 在执行函数时,协程内部会自动创建一个线程池来运行任务
    # run_in_executor()方法第一个参数为None时则默认创建一个线程池
    fut = loop.run_in_executor(None, func_1)
    result = await fut
    print('当前方式会自动创建一个线程池去执行普通函数: ', result)

    # 在协程函数中运行基于线程池的任务, 效果与以上代码一致
    with concurrent.futures.ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, func_1)
        print('在线程池中得到的执行结果: ', result)

    # 在协程函数中运行基于进程池的任务
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, func_1)
        print('在进程池中得到的执行结果: ', result)


if __name__ == "__main__":
    asyncio.run(main())

异步与非异步模块混合使用案例

import asyncio
import requests


async def download_image(url):
    # 发送网络请求下载图片
    print('开始下载:', url)
    loop = asyncio.get_running_loop()
    # request不支持异步,使用线程池来辅助实现
    future = loop.run_in_executor(None, requests.get, url)
    response = await future
    print('下载完成')
    # 图片保存
    file_name = url.rsplit('/')[-1]
    with open(file_name, 'wb') as f:
        f.write(response.content)


if __name__ == '__main__':
    url_list = [
        'http://pic.bizhi360.com/bbpic/98/10798.jpg',
        'http://pic.bizhi360.com/bbpic/92/10792.jpg',
        'http://pic.bizhi360.com/bbpic/86/10386.jpg'
    ]

    loop = asyncio.get_event_loop()

    # 使用事件循环对象批量创建task对象
    tasks = [loop.create_task(download_image(url)) for url in url_list]
    loop.run_until_complete(asyncio.wait(tasks))

上述代码运行逻辑:

  • 一开始创建了三个url的任务
  • 针对第一个url,执行download_image()函数,执行run_in_executor(),下面遇到await
  • 遇到await后协程挂起,去执行第二个url的任务,又创建一个线程,继续await,然后转向第三个url,然后继续创建线程,又等待。

以上的程序和纯依靠asyncio支持的协程实现异步存在一个区别就是,纯aysncio实现协程只存在一个线程,所有的I/O等待都是协程在等待。而上述过程,在实现异步的过程中,建立了三个线程,I/O等待是线程在进行等待,因此有较大的资源浪费。

异步迭代器 - 了解即可

异步迭代器

实现了__aiter__()__anext__()方法的对象,而__anext__()必须返回一个awaitable对象,async for会处理异步迭代器的__anext__()方法所返回的可等待对象,直到其引发一个StopAsyncIteration异常。

异步迭代对象

可在async for语句中被使用的对象,必须通过它的__aiter__()方法返回一个asynchronous iterator

import asyncio


class Reader:
    """
    自定义异步迭代器(同时也是异步可迭代对象)
    """

    def __init__(self):
        self.count = 0

    async def readline(self):
        # await asyncio.sleep(1)
        self.count += 1
        if self.count == 100:
            return None
        return self.count

    def __aiter__(self):
        return self

    async def __anext__(self):
        value = await self.readline()
        if value == None:
            raise StopAsyncIteration
        return value

上述代码不能直接使用async for执行

async for i in Reader():
    print(i)

打印结果:

  File "/workspace/python_code/协程备课.py", line 24
    async for i in Reader():
    ^
SyntaxError: 'async for' outside async function

async for必须运行在一个协程函数内

async def main():
    async for i in Reader():
        print(i)

asyncio.run(main())

绑定回调

task执行完毕的时候可以获取执行的结果,回调的最后一个参数是future对象,通过该对象可以获取协程返回值,如果回调需要多个参数,可以通过偏函数导入。

import asyncio


# 使用async 来定义一个协程对象
async def do_work(x):
    print('waiting:', x)
    return 'Done after {} s'.format(x)


# 获取协程对象
coroutine = do_work(3)
# 创建事件循环
loop = asyncio.get_event_loop()
# 创建task
task = loop.create_task(coroutine)


# 给任务添加回调函数
def callback(future):
    print('callback:', future.result())


task.add_done_callback(callback)

# 将协程对象加入到事件循环中
loop.run_until_complete(task)

futureresult

回调一直是很多异步编程的噩梦,程序员更喜欢用同步的编写方式写异步代码,以避免回调的问题。回调中我们使用了futureresult()方法,前面不绑定回调的例子中,可以看到taskfinished状态.在那个时候,可以直接读取taskresult方法。

import asyncio


# 使用async 来定义一个协程对象
async def do_work(x):
    print('waiting:', x)
    return f'Done after {x} s'


# 获取协程对象
coroutine = do_work(3)
# 创建事件循环
loop = asyncio.get_event_loop()
# 创建task
task = loop.create_task(coroutine)

# 将协程对象加入到事件循环中
loop.run_until_complete(task)
# 直接调用task中的result()来获取返回结果
print('直接获取返回结果:', task.result())

阻塞与await

使用async可以定义协程对象,使用await可以针对耗时的操作进行挂起,就像生成器中的yield一样,函数交出控制权。协程遇到await,事件循环就会挂起该协程,执行别的协程,直到其他的协程也挂起或者执行完毕,再执行下一个协程。

import time
import asyncio


# 使用async 来定义一个协程对象
async def do_work(x):
    print('waiting:', x)
    await asyncio.sleep(x)  # 模拟一个IO操作
    return f'Done after {x} s'


now = lambda: time.time()
start = now()
# 获取协程对象
coroutine = do_work(3)
# 创建事件循环
loop = asyncio.get_event_loop()
# 创建task
task = loop.create_task(coroutine)
# 将协程对象加入到事件循环中
loop.run_until_complete(task)
print('task result:', task.result())
print('Time:', now() - start)

打印结果:

waiting: 3
task result: Done after 3s
Time: 3.002943754196167

并发执行与同步执行

对创建的future对象进行变量赋值并进行await

import time
import asyncio

now = lambda: time.time()


async def func():
    task_1 = asyncio.ensure_future(asyncio.sleep(1))
    task_2 = asyncio.ensure_future(asyncio.sleep(1))
    await task_1
    await task_2


start = now()
loop = asyncio.get_event_loop()

for i in range(5):
    loop.run_until_complete(func())

print('异步所花费的时间: %f s' % (now() - start))

打印结果:

异步所花费的时间: 5.005247 s

直接运行future对象

import time
import asyncio

now = lambda: time.time()


async def func():
    await asyncio.ensure_future(asyncio.sleep(1))
    await asyncio.ensure_future(asyncio.sleep(1))


start = now()
loop = asyncio.get_event_loop()

for i in range(5):
    loop.run_until_complete(func())

print('异步所花费的时间: %f s' % (now() - start))

打印结果:

异步所花费的时间: 10.010090 s

直接对asyncio.ensure_future()创建的future对象进行await是不能实现并发的,必须将创建的对象返回给一个变量,在对该变量绑定的future对象进行await才可实现。更好的方法是使用asyncio.gather()方法或者aysncio.wait()方法来实现并发

异步上下文管理器

这种对象通过定义__aenter__()__aexit__()方法来对asyncio with语句中的环境进行控制,叫做异步上下文管理器。

import asyncio


class AsyncContextManager:
    def __init__(self, conn=None):
        self.conn = conn

    async def do_something(self):
        # 异步读取数据库操作
        return '模拟增删改查数据库并返回操作结果...'

    async def __aenter__(self):
        self.conn = await asyncio.sleep(1)
        return self

    async def __aexit__(self, exec_type, exc_val, exc_tb):
        """
         with语句运行结束之后触发此方法的运行
         exc_type:如果抛出异常, 这里获取异常类型
         exc_val:如果抛出异常, 这里显示异常内容
         exc_tb:如果抛出异常, 这里显示所在位置, traceback
         """
        # 异步关闭数据库连接
        await asyncio.sleep(1)


async def func():
    async with AsyncContextManager() as fp:
        result = await fp.do_something()
        print(result)


loop = asyncio.get_event_loop()
loop.run_until_complete(func())

uvloop

uvloopasyncio事件循环的替代方案,是一个第三方的库,使用uvloop可以在一定程度上提高事件循环的效率。uvloop事件循环效率 > 默认asyncio的事件循环效率,比其他框架效率至少提高两倍,性能比肩go语言。uvloop貌似暂不支持windows

import asyncio
import uvloop

# 将asyncio中的事件循环替换成uvloop中的事件循环
asyncio.set_event_loop(uvloop.EventLoopPolicy())
# 上下代码和以前一样
...

loop = asyncio.get_event_loop()
loop.run_until_complete()

实战案例

异步MySQL

# pip install aiomysql

import asyncio
import aiomysql


async def execute():
    # 网络IO操作,连接mysql
    conn = await aiomysql.connect(host='127.0.0.1', port=3306, user='root', password='root', db='test')
    # 网络IO操作,创建cursor()
    cur = await conn.cursor()
    # 网络IO操作,执行sql语句
    await cur.execute('SELECT host.user FROM user')
    # 网络IO操作,获取sql结果
    result = cur.fetchall()
    print(result)
    # 网络IO操作,关闭链接
    await cur.close()
    conn.close()


loop = asyncio.get_event_loop()
loop.run_until_complete(execute)

import asyncio
import aiomysql


async def execute(host, password):
    # 网络IO操作,连接mysql
    conn = await aiomysql.connect(host=host, port=3306, user='root', password=password, db='test')
    # 网络IO操作,创建cursor()
    cur = await conn.cursor()
    # 网络IO操作,执行sql语句
    await cur.execute('SELECT host.user FROM user')
    # 网络IO操作,获取sql结果
    result = cur.fetchall()
    print(result)
    # 网络IO操作,关闭链接
    await cur.close()
    conn.close()


async def main():
    task_list = [
        asyncio.ensure_future(execute('127.0.0.1', 'root')),
        asyncio.ensure_future(execute('40.0.0.1', 'root'))
    ]

    await asyncio.wait(task_list)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

课程总结

异步编程最大的意义:通过一个线程来实现并发行为。利用其IO等待的时间来做其他的事情

上次编辑于:
贡献者: QI