Python asyncio教程:从协程基础到异步爬虫实战完整指南

📝 718 字 · ☕ 2 分钟阅读

前言:为什么你需要学 asyncio?

你有没有遇到过这种情况:写了个爬虫抓100个页面,用 requests 一个个发,等了5分钟才跑完。然后你听说 Python 有个叫 asyncio 的东西能让速度翻10倍,打开文档一看——event loop、coroutine、awaitable……直接劝退。

别慌。asyncio 的核心思想就一句话:在等待 IO 的时候去干别的事。 就像你煮水的同时切菜,而不是傻站着等水开。

本教程用可运行、能复制粘贴立刻见效的代码带你从零掌握 asyncio,覆盖协程基础、Task并发、aiohttp异步爬虫、以及和 FastAPI(你如果看过本站的 FastAPI教程 就知道它底层完全靠 asyncio)的关系。

1. 同步 vs 异步:直观感受差距

先用一个简单的对比让数字说话。假设我们有3个任务,每个需要”等待”1秒(模拟网络请求):

import time
import asyncio

# ===== 同步版本 =====
def sync_task(name, delay):
    print(f"{name} 开始")
    time.sleep(delay)  # 阻塞——CPU干等
    print(f"{name} 完成")

def sync_main():
    start = time.time()
    for i in range(3):
        sync_task(f"任务{i}", 1)
    print(f"同步耗时: {time.time() - start:.1f}s")

sync_main()
# 任务0 开始
# 任务0 完成
# 任务1 开始
# 任务1 完成
# 任务2 开始
# 任务2 完成
# 同步耗时: 3.0s

# ===== 异步版本 =====
async def async_task(name, delay):
    print(f"{name} 开始")
    await asyncio.sleep(delay)  # 非阻塞——让出控制权
    print(f"{name} 完成")

async def async_main():
    start = time.time()
    await asyncio.gather(
        async_task("任务0", 1),
        async_task("任务1", 1),
        async_task("任务2", 1),
    )
    print(f"异步耗时: {time.time() - start:.1f}s")

asyncio.run(async_main())
# 任务0 开始
# 任务1 开始
# 任务2 开始     ← 三个任务几乎同时开始
# 任务0 完成
# 任务1 完成
# 任务2 完成
# 异步耗时: 1.0s  ← 3秒变1秒

核心区别:time.sleep() 把整个线程卡住;asyncio.sleep() 在等待期间把控制权交还给 event loop,让它去执行其他协程。

2. async/await 语法速查

asyncio 的世界有三个核心概念:

概念 说明 类比
async def 定义一个协程函数(coroutine function) 定义一个”可以暂停”的函数
await 暂停当前协程,等待另一个协程或 Future 完成 “你继续,我等这个结果”
asyncio.run() 启动 event loop 并运行协程 调度中心:统一管理所有协程
# 最小可运行例子
import asyncio

async def hello():
    await asyncio.sleep(0.5)
    return "Hello, asyncio!"

result = asyncio.run(hello())
print(result)  # Hello, asyncio!

3. 并发执行:gather、create_task 和 TaskGroup

asyncio 提供了多种并发方式,各有适用场景:

3.1 asyncio.gather —— 最常用的并发

import asyncio

async def fetch(url):
    print(f"正在请求 {url}...")
    await asyncio.sleep(1)  # 模拟网络请求
    return f"{url} 的数据"

async def main():
    results = await asyncio.gather(
        fetch("https://api.example.com/users"),
        fetch("https://api.example.com/posts"),
        fetch("https://api.example.com/comments"),
    )
    print(results)

asyncio.run(main())
# 正在请求 https://api.example.com/users...
# 正在请求 https://api.example.com/posts...
# 正在请求 https://api.example.com/comments...
# ['https://api.example.com/users 的数据', ...]

特点:gather 按传入顺序返回结果。任何一个协程抛异常,gather 会把异常传播出来(除非设 return_exceptions=True)。

3.2 create_task —— 创建后台任务

async def main():
    # 创建 Task(立即被调度,不等 await)
    task1 = asyncio.create_task(fetch("url1"))
    task2 = asyncio.create_task(fetch("url2"))

    # 在这期间可以做别的事
    print("两个请求已发出,我干点别的...")
    await asyncio.sleep(0.5)

    # 最后收集结果
    r1 = await task1
    r2 = await task2
    print(r1, r2)

gather vs create_task 选择:

  • 知道所有任务、要等全部完成 → gather
  • 动态创建任务、想边发边干别的 → create_task

3.3 TaskGroup(Python 3.11+)—— 结构化并发

async def main():
    async with asyncio.TaskGroup() as tg:
        tg.create_task(fetch("url1"))
        tg.create_task(fetch("url2"))
    # 退出 with 块时自动等待所有任务完成
    print("全部完成!")

相比 gather,TaskGroup 的好处是:任何一个子任务出错,其他子任务自动取消,不会留下僵尸任务。

4. 实战:用 aiohttp 做异步爬虫

asyncio 最经典的应用场景就是异步爬虫。我们把之前 Python 爬虫入门教程 里的 requests 同步方案升级为异步:

import asyncio
import aiohttp
import time

# 安装: pip install aiohttp

async def get_title(session, url):
    try:
        async with session.get(url, timeout=10) as resp:
            html = await resp.text()
            # 简单提取 <title> 标签
            start = html.find("<title>") + 7
            end = html.find("</title>", start)
            title = html[start:end] if start > 6 else "无标题"
            return url, resp.status, title[:60]
    except Exception as e:
        return url, 0, str(e)[:50]

async def main():
    urls = [
        "https://www.python.org",
        "https://docs.python.org/3/library/asyncio.html",
        "https://pypi.org/project/aiohttp/",
        "https://github.com",
        "https://fastapi.tiangolo.com",
        "https://www.devlearn.club",
    ]

    start = time.time()
    async with aiohttp.ClientSession() as session:
        tasks = [get_title(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

    elapsed = time.time() - start
    print(f"抓取 {len(urls)} 个页面,耗时 {elapsed:.2f}s\n")
    for url, status, title in results:
        print(f"[{status}] {url}")
        print(f"  → {title}")

asyncio.run(main())
# 抓取 6 个页面,耗时 1.23s

同样的任务用 requests 同步版要 6+ 秒。这就是 asyncio 的威力——IO 密集场景下,速度几乎等于最慢那个请求的时间。

4.1 进阶:限制并发数(Semaphore)

直接对100个URL发起100个并发请求可能被服务器封IP,也可能打满带宽。用 asyncio.Semaphore 控制并发:

async def fetch_with_limit(session, url, semaphore):
    async with semaphore:  # 最多 N 个协程同时进入
        return await get_title(session, url)

async def main():
    sem = asyncio.Semaphore(10)  # 最多同时10个请求
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_with_limit(session, url, sem) for url in urls]
        results = await asyncio.gather(*tasks)

5. 常见陷阱与最佳实践

5.1 不要在协程里调用同步阻塞函数

# ❌ 错误:time.sleep 阻塞了整个 event loop
async def bad_example():
    time.sleep(5)  # 整个事件循环卡住5秒

# ✅ 正确:使用 asyncio 版本的 sleep
async def good_example():
    await asyncio.sleep(5)

# ✅ 或者把阻塞调用丢到线程池
async def better_example():
    await asyncio.to_thread(time.sleep, 5)  # Python 3.9+

5.2 别忘了 await

async def main():
    # ❌ 不会被执行——协程只是创建了,没有调度
    fetch("url")

    # ✅ 必须 await 或 create_task
    await fetch("url")
    # 或
    asyncio.create_task(fetch("url"))

5.3 异常处理

results = await asyncio.gather(
    fetch("url1"),
    fetch("url2"),
    return_exceptions=True  # 异常不会中断其他任务
)
for r in results:
    if isinstance(r, Exception):
        print(f"出错: {r}")
    else:
        print(f"成功: {r}")

5.4 Event Loop 冲突

在 Jupyter Notebook、某些 Web 框架或已经运行 event loop 的环境里,asyncio.run() 会报 RuntimeError: Event loop is already running。解决方案:

# Jupyter 里直接用 await(不需要 asyncio.run())
await fetch("url")

# 或者在已有 loop 的环境里
loop = asyncio.get_event_loop()
loop.create_task(fetch("url"))

6. asyncio 在 Web 框架中的应用

你之前在本站看到的 FastAPI 教程Flask 教程,它们对异步的处理完全不同:

FastAPI Flask
路由定义 async def def(同步)
数据库查询 await db.execute() db.execute() 阻塞
并发请求 自动 async,高并发性能好 需要 Gunicorn + workers 弥补
适用场景 API、微服务、实时应用 传统网站、CMS

如果你要用 SQLAlchemy 做异步数据库操作,可以参考本站 SQLAlchemy Async 教程

7. 速查表:常用 asyncio API

API 用途 Python 版本
asyncio.run(coro) 启动 event loop 运行协程 3.7+
asyncio.sleep(n) 异步等待 n 秒 3.4+
asyncio.gather(*coros) 并发运行多个协程,返回结果列表 3.4+
asyncio.create_task(coro) 创建 Task 并加入 event loop 3.7+
asyncio.TaskGroup() 结构化并发(自动取消) 3.11+
asyncio.Semaphore(n) 限制并发数 3.4+
asyncio.to_thread(func) 在线程池中执行同步函数 3.9+
asyncio.wait_for(coro, n) 设置超时,超时抛 TimeoutError 3.4+
asyncio.Queue() 生产者-消费者模式异步队列 3.4+

常见问题(FAQ)

Q1: asyncio 能让我所有的 Python 代码变快吗?

不能。 asyncio 加速的是 IO 密集型 任务(网络请求、文件读写、数据库查询),不是 CPU 密集型任务(数学计算、图像处理)。如果你的代码大部分时间在 CPU 上跑,用 multiprocessing 或者换语言更有效。

简单判断:你的代码里 time.sleep() / 网络请求多不多?多 → asyncio 有戏。纯 for 循环加加减减 → 别折腾。

Q2: aiohttp 和 requests 用哪个?

看场景:

  • 发1-3个请求requests,简单够用
  • 爬10+页面、API并发调用 → aiohttp + asyncio,速度碾压
  • 需要浏览器自动化 → 用 Selenium 或 Playwright,它们内部也支持 async API

Q3: asyncio 和 threading 有什么区别?什么时候用哪个?

一句话:IO密集用 asyncio,CPU密集用 multiprocessing,threading 基本可以忘了。

Python 的 GIL(全局解释器锁)让 threading 在 CPU 计算上毫无优势。asyncio 是单线程、单进程,通过事件循环实现并发——没有 GIL 竞争、没有线程切换开销、没有死锁问题。对于网络请求、数据库查询、文件读写等 IO 操作,asyncio 是最优解。
只有在需要调用不支持 async 的 C 库(通过 asyncio.to_thread 也救不了)时,才考虑 threading。

总结

asyncio 不是银弹,但它是 Python 解决 IO 并发问题最优雅的方案。三个关键点记住:

  1. 遇到 await 就暂停,先去干别的
  2. gather 批量并发,Semaphore 控制上限
  3. 别在协程里调同步阻塞函数

下一步建议:把之前用 requests 写的爬虫改成 aiohttp 版,亲身体验速度差;然后看看 FastAPI 的源码,你会发现路由处理函数全是 async def——这就是 asyncio 在生产环境中最常见的形态。

📤 分享这篇文章