@[TOC](如何让你的Python 并发任务性能更好)
## 关于 asyncio.gather、asyncio.as_completed 和 asyncio.wait 的最佳实践
我们知道,由于 GIL,Python 的多线程性能从未达到预期。因此,从python从 3.4 版本开始,Python 引入了 asyncio 包,通过并发并发来执行 IO 绑定的任务。经过多次迭代,asyncio API 运行得非常好,与多线程版本相比,并发任务的性能得到了显著提高。
## 但是,程序员在使用 asyncio 时仍然会犯很多错误:
如下所示,一个错误是直接使用 await 协程方法,将对并发任务的调用从异步更改为同步,最终失去并发功能。
“`python
async def main():
result_1 = await some_coro(“name-1”)
result_2 = await some_coro(“name-2”)
“`
另一个错误如下所示,尽管程序员意识到他需要使用 create_task 来创建要在后台执行的任务。但是,下面这种逐个 Wait 任务的方式,将不同时间的任务变成了有序等待。
“`python
async def main():
task_1 = asyncio.create_task(some_coro(“name-1”))
task_2 = asyncio.create_task(some_coro(“name-2”))
result_1 = await task_1
result_2 = await task_2“`
“`
> 此代码将等待 task_1 先完成,而不管task_2是否先完成。
## So, 什么是并发任务执行?
那么,什么是真正的并发任务呢?让我们用一个图表来说明:

如图所示,并发进程应由两部分组成:启动后台任务、将后台任务重新加入主函数以及获取结果。
> 大多数读者已经知道如何使用 create_task 来启动后台任务。今天,我将介绍几种等待后台任务完成的方法以及每种方法的最佳实践。
## 让我们开始吧! Start……..
> 在我们开始介绍今天的主角之前,我们需要准备一个示例异步方法来模拟 IO 绑定的方法调用,以及一个自定义 AsyncException,该异常可用于在测试引发异常时友好地提示异常消息:
“`python
from random import random, randint
import asyncio
class AsyncException(Exception):
def __init__(self, message, *args, **kwargs):
self.message = message
super(*args, **kwargs)
def __str__(self):
return self.message
async def some_coro(name):
print(f”Coroutine {name} begin to run”)
value = random()
delay = randint(1, 4)
await asyncio.sleep(delay)
if value > 0.5:
raise AsyncException(f”Something bad happen after delay {delay} second(s)”)
print(f”Coro {name} is Done. with delay {delay} second(s)”)
return value
“`
## 并发执行方法的比较
> 一旦我们完成了准备工作,就该开始一天的旅程并系好安全带了。
### 1. asyncio.gather
> asyncio.gather 可用于启动一组后台任务,等待它们完成执行,并获取结果列表:
“`python
async def main():
aws, results = [], []
for i in range(3):
aws.append(asyncio.create_task(some_coro(f’name-{i}’)))
results = await asyncio.gather(*aws) # need to unpack the list
for result in results:
print(f”>got : {result}”)
asyncio.run(main())
“`
> asyncio.gather 虽然构成了一组后台任务,但不能直接接受列表或集合作为参数。如果需要传入包含后台任务的列表,请解压。
>
asyncio.gather 接受 return_exceptions 参数。当 return_exception 的值为 False 时,如果任何后台任务引发异常,则会将其引发给 gather 方法的调用方。并且 gather 方法的结果列表为空。
“`python
async def main():
aws, results = [], []
for i in range(3):
aws.append(asyncio.create_task(some_coro(f’name-{i}’)))
try:
results = await asyncio.gather(*aws, return_exceptions=False) # need to unpack the list
except AsyncException as e:
print(e)
for result in results:
print(f”>got : {result}”)
asyncio.run(main())
“`

> 当 return_exception 值为 True 时,后台任务抛出的异常不会影响其他任务的执行,最终会合并到结果列表中并一起返回
“`python
results = await asyncio.gather(*aws, return_exceptions=True)
“`

接下来,让我们看看为什么 gather 方法不能直接接受列表,而必须解压缩列表。因为当一个列表被填写和执行时,在我们等待它们完成的同时,很难将新任务添加到列表中。但是 gather 方法可以使用嵌套组将已有任务与新任务混合,解决了中间无法添加新任务的问题:
“`python
async def main():
aws, results = [], []
for i in range(3):
aws.append(asyncio.create_task(some_coro(f’name-{i}’)))
group_1 = asyncio.gather(*aws) # note we don’t use await now
# when some situation happen, we may add a new task
group_2 = asyncio.gather(group_1, asyncio.create_task(some_coro(“a new task”)))
results = await group_2
for result in results:
print(f”>got : {result}”)
asyncio.run(main())
“`
但是,gather 不能直接设置 timeout 参数。如果需要为所有正在运行的任务设置超时,请使用这个姿势,它不够优雅。
“`python
async def main():
aws, results = [], []
for i in range(3):
aws.append(asyncio.create_task(some_coro(f’name-{i}’)))
results = await asyncio.wait_for(asyncio.gather(*aws), timeout=2)
for result in results:
print(f”>got : {result}”)
asyncio.run(main())
“`
### 2. asyncio.as_completed
有时,我们必须在完成一个后台任务后立即启动以下操作。例如,当我们爬取一些数据并立即调用机器学习模型进行计算时,gather 方法无法满足我们的需求,但我们可以使用 as_completed 方法。
> 在使用asyncio.as_completed方法之前,我们先看一下该方法的源代码。
“`python
# This is *not* a @coroutine! It is just an iterator (yielding Futures).
def as_completed(fs, *, timeout=None):
# …
for f in todo:
f.add_done_callback(_on_completion)
if todo and timeout is not None:
timeout_handle = loop.call_later(timeout, _on_timeout)
for _ in range(len(todo)):
yield _wait_for_one()
“`
源代码显示 as_completed 不是并发方法,并返回一个带有 yield 语句的迭代器。所以我们可以直接对每个已完成的后台任务进行迭代,并且可以单独处理每个任务的异常,而不会影响其他任务的执行:
“`python
async def main():
aws = []
for i in range(5):
aws.append(asyncio.create_task(some_coro(f”name-{i}”)))
for done in asyncio.as_completed(aws): # we don’t need to unpack the list
try:
result = await done
print(f”>got : {result}”)
except AsyncException as e:
print(e)
asyncio.run(main())
“`
as_completed 接受 timeout 参数,超时发生后当前迭代的任务将抛出 asyncio。TimeoutError 中:
“`python
async def main():
aws = []
for i in range(5):
aws.append(asyncio.create_task(some_coro(f”name-{i}”)))
for done in asyncio.as_completed(aws, timeout=2): # we don’t need to unpack the list
try:
result = await done
print(f”>got : {result}”)
except AsyncException as e:
print(e)
except asyncio.TimeoutError: # we need to handle the TimeoutError
print(“time out.”)
asyncio.run(main())
“`

as_completed在处理任务执行结果方面比 gather 灵活得多,但在等待期间很难将新任务添加到原始任务列表中。
### 3. asyncio.wait
asyncio.wait 的调用方式与 as_completed 相同,但返回一个包含两组的元组:done 和 pending。done 保存已完成执行的任务,而 pending 保存仍在运行的任务。
> asyncio.wait 接受一个 return_when 参数,该参数可以采用三个枚举值:
– 当 return_when 为 asyncio 时。ALL_COMPLETED,done 存储所有已完成的任务,而 pending 为空。
– 当 return_when 为 asyncio 时。FIRST_COMPLETED,done 保存所有已完成的任务,而 pending 保存仍在运行的任务。
“`python
async def main():
aws = set()
for i in range(5):
aws.add(asyncio.create_task(some_coro(f”name-{i}”)))
done, pending = await asyncio.wait(aws, return_when=asyncio.FIRST_COMPLETED)
for task in done:
try:
result = await task
print(f”>got : {result}”)
except AsyncException as e:
print(e)
print(f”the length of pending is {len(pending)}”)
asyncio.run(main())
“`

– 当 return_when 为 asyncio 时。FIRST_EXCEPTION,done 存储已引发异常并已完成执行的任务,而 pending 保存仍在运行的任务。
当 return_when 为 asyncio 时。FIRST_COMPLETED 或 asyncio 的 Alpha 或 asyncio 中。FIRST_EXECEPTION,我们可以递归调用 asyncio.wait,这样我们就可以添加新任务并继续等待所有任务完成,具体取决于情况。
“`python
async def main():
pending = set()
for i in range(5):
pending.add(asyncio.create_task(some_coro(f”name-{i}”))) # note the type and name of the task list
while pending:
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_EXCEPTION)
for task in done:
try:
result = await task
print(f”>got : {result}”)
except AsyncException as e:
print(e)
pending.add(asyncio.create_task(some_coro(“a new task”)))
print(f”the length of pending is {len(pending)}”)
asyncio.run(main())
“`

### 4. asyncio.TaskGroup
在 Python 3.11 中,asyncio 引入了新的 TaskGroup API,它正式使 Python 能够支持结构化并发。此功能允许您以更 Pythonic 的方式管理并发任务的生命周期。由于篇幅有限,这里就不多说了,但有兴趣的读者可以参考其他的文章
## 结论
本文介绍了 asyncio.gather、asyncio.as_completed 和 asyncio.wait API,还回顾了新的 asyncio。Python 3.11 中引入的 TaskGroup 功能。
根据实际需求使用这些后台任务管理方法,可以让我们的 asyncio 并发编程更加灵活。
来源链接:https://www.cnblogs.com/flyxi/p/18681281










没有回复内容