前言:为什么你需要学 asyncio?
你有没有遇到过这种情况:写了个爬虫抓100个页面,用 requests 一个个发,等了5分钟才跑完。然后你听说 Python 有个叫 asyncio 的东西能让速度翻10倍,打开文档一看——event loop、coroutine、awaitable……直接劝退。
别慌。asyncio 的核心思想就一句话:在等待 IO 的时候去干别的事。 就像你煮水的同时切菜,而不是傻站着等水开。
本教程用可运行、能复制粘贴立刻见效的代码带你从零掌握 asyncio,覆盖协程基础、Task并发、aiohttp异步爬虫、以及和 FastAPI(你如果看过本站的 FastAPI教程 就知道它底层完全靠 asyncio)的关系。
1. 同步 vs 异步:直观感受差距
先用一个简单的对比让数字说话。假设我们有3个任务,每个需要”等待”1秒(模拟网络请求):
import time
import asyncio
# ===== 同步版本 =====
def sync_task(name, delay):
print(f"{name} 开始")
time.sleep(delay) # 阻塞——CPU干等
print(f"{name} 完成")
def sync_main():
start = time.time()
for i in range(3):
sync_task(f"任务{i}", 1)
print(f"同步耗时: {time.time() - start:.1f}s")
sync_main()
# 任务0 开始
# 任务0 完成
# 任务1 开始
# 任务1 完成
# 任务2 开始
# 任务2 完成
# 同步耗时: 3.0s
# ===== 异步版本 =====
async def async_task(name, delay):
print(f"{name} 开始")
await asyncio.sleep(delay) # 非阻塞——让出控制权
print(f"{name} 完成")
async def async_main():
start = time.time()
await asyncio.gather(
async_task("任务0", 1),
async_task("任务1", 1),
async_task("任务2", 1),
)
print(f"异步耗时: {time.time() - start:.1f}s")
asyncio.run(async_main())
# 任务0 开始
# 任务1 开始
# 任务2 开始 ← 三个任务几乎同时开始
# 任务0 完成
# 任务1 完成
# 任务2 完成
# 异步耗时: 1.0s ← 3秒变1秒
核心区别:time.sleep() 把整个线程卡住;asyncio.sleep() 在等待期间把控制权交还给 event loop,让它去执行其他协程。
2. async/await 语法速查
asyncio 的世界有三个核心概念:
| 概念 | 说明 | 类比 |
|---|---|---|
async def |
定义一个协程函数(coroutine function) | 定义一个”可以暂停”的函数 |
await |
暂停当前协程,等待另一个协程或 Future 完成 | “你继续,我等这个结果” |
asyncio.run() |
启动 event loop 并运行协程 | 调度中心:统一管理所有协程 |
# 最小可运行例子
import asyncio
async def hello():
await asyncio.sleep(0.5)
return "Hello, asyncio!"
result = asyncio.run(hello())
print(result) # Hello, asyncio!
3. 并发执行:gather、create_task 和 TaskGroup
asyncio 提供了多种并发方式,各有适用场景:
3.1 asyncio.gather —— 最常用的并发
import asyncio
async def fetch(url):
print(f"正在请求 {url}...")
await asyncio.sleep(1) # 模拟网络请求
return f"{url} 的数据"
async def main():
results = await asyncio.gather(
fetch("https://api.example.com/users"),
fetch("https://api.example.com/posts"),
fetch("https://api.example.com/comments"),
)
print(results)
asyncio.run(main())
# 正在请求 https://api.example.com/users...
# 正在请求 https://api.example.com/posts...
# 正在请求 https://api.example.com/comments...
# ['https://api.example.com/users 的数据', ...]
特点:gather 按传入顺序返回结果。任何一个协程抛异常,gather 会把异常传播出来(除非设 return_exceptions=True)。
3.2 create_task —— 创建后台任务
async def main():
# 创建 Task(立即被调度,不等 await)
task1 = asyncio.create_task(fetch("url1"))
task2 = asyncio.create_task(fetch("url2"))
# 在这期间可以做别的事
print("两个请求已发出,我干点别的...")
await asyncio.sleep(0.5)
# 最后收集结果
r1 = await task1
r2 = await task2
print(r1, r2)
gather vs create_task 选择:
- 知道所有任务、要等全部完成 →
gather - 动态创建任务、想边发边干别的 →
create_task
3.3 TaskGroup(Python 3.11+)—— 结构化并发
async def main():
async with asyncio.TaskGroup() as tg:
tg.create_task(fetch("url1"))
tg.create_task(fetch("url2"))
# 退出 with 块时自动等待所有任务完成
print("全部完成!")
相比 gather,TaskGroup 的好处是:任何一个子任务出错,其他子任务自动取消,不会留下僵尸任务。
4. 实战:用 aiohttp 做异步爬虫
asyncio 最经典的应用场景就是异步爬虫。我们把之前 Python 爬虫入门教程 里的 requests 同步方案升级为异步:
import asyncio
import aiohttp
import time
# 安装: pip install aiohttp
async def get_title(session, url):
try:
async with session.get(url, timeout=10) as resp:
html = await resp.text()
# 简单提取 <title> 标签
start = html.find("<title>") + 7
end = html.find("</title>", start)
title = html[start:end] if start > 6 else "无标题"
return url, resp.status, title[:60]
except Exception as e:
return url, 0, str(e)[:50]
async def main():
urls = [
"https://www.python.org",
"https://docs.python.org/3/library/asyncio.html",
"https://pypi.org/project/aiohttp/",
"https://github.com",
"https://fastapi.tiangolo.com",
"https://www.devlearn.club",
]
start = time.time()
async with aiohttp.ClientSession() as session:
tasks = [get_title(session, url) for url in urls]
results = await asyncio.gather(*tasks)
elapsed = time.time() - start
print(f"抓取 {len(urls)} 个页面,耗时 {elapsed:.2f}s\n")
for url, status, title in results:
print(f"[{status}] {url}")
print(f" → {title}")
asyncio.run(main())
# 抓取 6 个页面,耗时 1.23s
同样的任务用 requests 同步版要 6+ 秒。这就是 asyncio 的威力——IO 密集场景下,速度几乎等于最慢那个请求的时间。
4.1 进阶:限制并发数(Semaphore)
直接对100个URL发起100个并发请求可能被服务器封IP,也可能打满带宽。用 asyncio.Semaphore 控制并发:
async def fetch_with_limit(session, url, semaphore):
async with semaphore: # 最多 N 个协程同时进入
return await get_title(session, url)
async def main():
sem = asyncio.Semaphore(10) # 最多同时10个请求
async with aiohttp.ClientSession() as session:
tasks = [fetch_with_limit(session, url, sem) for url in urls]
results = await asyncio.gather(*tasks)
5. 常见陷阱与最佳实践
5.1 不要在协程里调用同步阻塞函数
# ❌ 错误:time.sleep 阻塞了整个 event loop
async def bad_example():
time.sleep(5) # 整个事件循环卡住5秒
# ✅ 正确:使用 asyncio 版本的 sleep
async def good_example():
await asyncio.sleep(5)
# ✅ 或者把阻塞调用丢到线程池
async def better_example():
await asyncio.to_thread(time.sleep, 5) # Python 3.9+
5.2 别忘了 await
async def main():
# ❌ 不会被执行——协程只是创建了,没有调度
fetch("url")
# ✅ 必须 await 或 create_task
await fetch("url")
# 或
asyncio.create_task(fetch("url"))
5.3 异常处理
results = await asyncio.gather(
fetch("url1"),
fetch("url2"),
return_exceptions=True # 异常不会中断其他任务
)
for r in results:
if isinstance(r, Exception):
print(f"出错: {r}")
else:
print(f"成功: {r}")
5.4 Event Loop 冲突
在 Jupyter Notebook、某些 Web 框架或已经运行 event loop 的环境里,asyncio.run() 会报 RuntimeError: Event loop is already running。解决方案:
# Jupyter 里直接用 await(不需要 asyncio.run())
await fetch("url")
# 或者在已有 loop 的环境里
loop = asyncio.get_event_loop()
loop.create_task(fetch("url"))
6. asyncio 在 Web 框架中的应用
你之前在本站看到的 FastAPI 教程 和 Flask 教程,它们对异步的处理完全不同:
| FastAPI | Flask | |
|---|---|---|
| 路由定义 | async def |
def(同步) |
| 数据库查询 | await db.execute() |
db.execute() 阻塞 |
| 并发请求 | 自动 async,高并发性能好 | 需要 Gunicorn + workers 弥补 |
| 适用场景 | API、微服务、实时应用 | 传统网站、CMS |
如果你要用 SQLAlchemy 做异步数据库操作,可以参考本站 SQLAlchemy Async 教程。
7. 速查表:常用 asyncio API
| API | 用途 | Python 版本 |
|---|---|---|
asyncio.run(coro) |
启动 event loop 运行协程 | 3.7+ |
asyncio.sleep(n) |
异步等待 n 秒 | 3.4+ |
asyncio.gather(*coros) |
并发运行多个协程,返回结果列表 | 3.4+ |
asyncio.create_task(coro) |
创建 Task 并加入 event loop | 3.7+ |
asyncio.TaskGroup() |
结构化并发(自动取消) | 3.11+ |
asyncio.Semaphore(n) |
限制并发数 | 3.4+ |
asyncio.to_thread(func) |
在线程池中执行同步函数 | 3.9+ |
asyncio.wait_for(coro, n) |
设置超时,超时抛 TimeoutError | 3.4+ |
asyncio.Queue() |
生产者-消费者模式异步队列 | 3.4+ |
常见问题(FAQ)
Q1: asyncio 能让我所有的 Python 代码变快吗?
不能。 asyncio 加速的是 IO 密集型 任务(网络请求、文件读写、数据库查询),不是 CPU 密集型任务(数学计算、图像处理)。如果你的代码大部分时间在 CPU 上跑,用 multiprocessing 或者换语言更有效。
简单判断:你的代码里 time.sleep() / 网络请求多不多?多 → asyncio 有戏。纯 for 循环加加减减 → 别折腾。
Q2: aiohttp 和 requests 用哪个?
Q3: asyncio 和 threading 有什么区别?什么时候用哪个?
一句话:IO密集用 asyncio,CPU密集用 multiprocessing,threading 基本可以忘了。
Python 的 GIL(全局解释器锁)让 threading 在 CPU 计算上毫无优势。asyncio 是单线程、单进程,通过事件循环实现并发——没有 GIL 竞争、没有线程切换开销、没有死锁问题。对于网络请求、数据库查询、文件读写等 IO 操作,asyncio 是最优解。
只有在需要调用不支持 async 的 C 库(通过 asyncio.to_thread 也救不了)时,才考虑 threading。
总结
asyncio 不是银弹,但它是 Python 解决 IO 并发问题最优雅的方案。三个关键点记住:
- 遇到 await 就暂停,先去干别的
- gather 批量并发,Semaphore 控制上限
- 别在协程里调同步阻塞函数
下一步建议:把之前用 requests 写的爬虫改成 aiohttp 版,亲身体验速度差;然后看看 FastAPI 的源码,你会发现路由处理函数全是 async def——这就是 asyncio 在生产环境中最常见的形态。