Python实战:用SQLAlchemy 2.0实现异步数据库操作

📝 483 字 · ☕ 2 分钟阅读

Python实战:用SQLAlchemy 2.0实现异步数据库操作

前言

在当今的高并发Web应用和微服务架构中,异步编程已成为标配。而数据库操作往往是性能瓶颈所在——如果数据库访问是同步的,即使Web框架本身是异步的,也会因为阻塞线程而失去异步的优势。Python的SQLAlchemy 2.0版本提供了成熟的异步ORM支持,让我们可以在FastAPI、Quart等异步框架中优雅地操作数据库。

环境准备

首先,安装必要的依赖:

pip install sqlalchemy[asyncio]>=2.0 asyncpg pydantic

其中asyncpg是Python最高效的PostgreSQL异步驱动,配合SQLAlchemy使用效果最佳。

项目目录结构如下:

async_db_demo/
├── models.py      # 数据库模型
├── crud.py        # 数据操作
├── main.py        # 入口和示例
└── config.py      # 数据库配置

核心实现

步骤1:配置数据库连接

在config.py中设置异步数据库引擎:

from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession

DATABASE_URL = "postgresql+asyncpg://user:password@localhost:5432/mydb"

engine = create_async_engine(DATABASE_URL, echo=True, pool_size=10, max_overflow=20)
async_session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

注意URL前缀使用postgresql+asyncpg而非传统的postgresql://。SQLAlchemy通过URL前缀来识别应该使用哪个数据库驱动。

步骤2:定义模型

在models.py中创建异步ORM模型:

from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy import String, DateTime, func
import datetime

class Base(DeclarativeBase):
    pass

class Article(Base):
    __tablename__ = "articles"

    id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
    title: Mapped[str] = mapped_column(String(200), nullable=False)
    content: Mapped[str] = mapped_column(nullable=False)
    author: Mapped[str] = mapped_column(String(100), default="anonymous")
    created_at: Mapped[datetime.datetime] = mapped_column(
        DateTime(timezone=True), server_default=func.now()
    )
    updated_at: Mapped[datetime.datetime] = mapped_column(
        DateTime(timezone=True), onupdate=func.now(), nullable=True
    )

SQLAlchemy 2.0推荐使用Mapped类型注解来代替早期的declarative风格,类型安全且IDE支持更好。

步骤3:实现CRUD操作

在crud.py中编写异步数据操作方法:

from sqlalchemy import select, delete, update
from sqlalchemy.ext.asyncio import AsyncSession
from models import Article

async def create_article(
    session: AsyncSession, title: str, content: str, author: str = "anonymous"
) -> Article:
    article = Article(title=title, content=content, author=author)
    session.add(article)
    await session.commit()
    await session.refresh(article)
    return article

async def get_article(session: AsyncSession, article_id: int) -> Article | None:
    result = await session.execute(select(Article).where(Article.id == article_id))
    return result.scalar_one_or_none()

async def list_articles(
    session: AsyncSession, skip: int = 0, limit: int = 10
) -> list[Article]:
    result = await session.execute(
        select(Article).offset(skip).limit(limit).order_by(Article.created_at.desc())
    )
    return list(result.scalars().all())

async def update_article(
    session: AsyncSession, article_id: int, **kwargs
) -> Article | None:
    article = await get_article(session, article_id)
    if article is None:
        return None
    for key, value in kwargs.items():
        setattr(article, key, value)
    await session.commit()
    await session.refresh(article)
    return article

async def delete_article(session: AsyncSession, article_id: int) -> bool:
    article = await get_article(session, article_id)
    if article is None:
        return False
    await session.delete(article)
    await session.commit()
    return True

步骤4:使用示例

在main.py中编写完整的异步使用流程:

import asyncio
from config import async_session, engine, Base
from models import Article
from crud import create_article, list_articles, get_article

async def main():
    # 创建所有表
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

    # 插入数据
    async with async_session() as session:
        article = await create_article(
            session,
            title="SQLAlchemy异步入门",
            content="本文介绍了如何在Python中使用SQLAlchemy 2.0...",
            author="devmaster"
        )
        print(f"Created article ID: {article.id}")

    # 查询数据
    async with async_session() as session:
        articles = await list_articles(session, limit=5)
        for a in articles:
            print(f"{a.id}: {a.title} by {a.author}")

    # 关闭引擎
    await engine.dispose()

if __name__ == "__main__":
    asyncio.run(main())

总结

  • SQLAlchemy 2.0的异步支持通过asyncio扩展实现,需要配合asyncpgaiomysql驱动使用
  • 使用async_sessionmaker创建异步session工厂,所有数据库操作都必须用await调用
  • 核心API(select/insert/update/delete)与同步版本保持一致,学习成本低
  • 建议在FastAPI等异步框架中配合依赖注入使用,进一步提升开发效率
  • 生产环境中注意连接池配置和事务边界管理

📤 分享这篇文章