📝 483 字 · ☕ 2 分钟阅读
Python实战:用SQLAlchemy 2.0实现异步数据库操作
前言
在当今的高并发Web应用和微服务架构中,异步编程已成为标配。而数据库操作往往是性能瓶颈所在——如果数据库访问是同步的,即使Web框架本身是异步的,也会因为阻塞线程而失去异步的优势。Python的SQLAlchemy 2.0版本提供了成熟的异步ORM支持,让我们可以在FastAPI、Quart等异步框架中优雅地操作数据库。
环境准备
首先,安装必要的依赖:
pip install sqlalchemy[asyncio]>=2.0 asyncpg pydantic
其中asyncpg是Python最高效的PostgreSQL异步驱动,配合SQLAlchemy使用效果最佳。
项目目录结构如下:
async_db_demo/
├── models.py # 数据库模型
├── crud.py # 数据操作
├── main.py # 入口和示例
└── config.py # 数据库配置
核心实现
步骤1:配置数据库连接
在config.py中设置异步数据库引擎:
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
DATABASE_URL = "postgresql+asyncpg://user:password@localhost:5432/mydb"
engine = create_async_engine(DATABASE_URL, echo=True, pool_size=10, max_overflow=20)
async_session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
注意URL前缀使用postgresql+asyncpg而非传统的postgresql://。SQLAlchemy通过URL前缀来识别应该使用哪个数据库驱动。
步骤2:定义模型
在models.py中创建异步ORM模型:
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy import String, DateTime, func
import datetime
class Base(DeclarativeBase):
pass
class Article(Base):
__tablename__ = "articles"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
title: Mapped[str] = mapped_column(String(200), nullable=False)
content: Mapped[str] = mapped_column(nullable=False)
author: Mapped[str] = mapped_column(String(100), default="anonymous")
created_at: Mapped[datetime.datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)
updated_at: Mapped[datetime.datetime] = mapped_column(
DateTime(timezone=True), onupdate=func.now(), nullable=True
)
SQLAlchemy 2.0推荐使用Mapped类型注解来代替早期的declarative风格,类型安全且IDE支持更好。
步骤3:实现CRUD操作
在crud.py中编写异步数据操作方法:
from sqlalchemy import select, delete, update
from sqlalchemy.ext.asyncio import AsyncSession
from models import Article
async def create_article(
session: AsyncSession, title: str, content: str, author: str = "anonymous"
) -> Article:
article = Article(title=title, content=content, author=author)
session.add(article)
await session.commit()
await session.refresh(article)
return article
async def get_article(session: AsyncSession, article_id: int) -> Article | None:
result = await session.execute(select(Article).where(Article.id == article_id))
return result.scalar_one_or_none()
async def list_articles(
session: AsyncSession, skip: int = 0, limit: int = 10
) -> list[Article]:
result = await session.execute(
select(Article).offset(skip).limit(limit).order_by(Article.created_at.desc())
)
return list(result.scalars().all())
async def update_article(
session: AsyncSession, article_id: int, **kwargs
) -> Article | None:
article = await get_article(session, article_id)
if article is None:
return None
for key, value in kwargs.items():
setattr(article, key, value)
await session.commit()
await session.refresh(article)
return article
async def delete_article(session: AsyncSession, article_id: int) -> bool:
article = await get_article(session, article_id)
if article is None:
return False
await session.delete(article)
await session.commit()
return True
步骤4:使用示例
在main.py中编写完整的异步使用流程:
import asyncio
from config import async_session, engine, Base
from models import Article
from crud import create_article, list_articles, get_article
async def main():
# 创建所有表
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
# 插入数据
async with async_session() as session:
article = await create_article(
session,
title="SQLAlchemy异步入门",
content="本文介绍了如何在Python中使用SQLAlchemy 2.0...",
author="devmaster"
)
print(f"Created article ID: {article.id}")
# 查询数据
async with async_session() as session:
articles = await list_articles(session, limit=5)
for a in articles:
print(f"{a.id}: {a.title} by {a.author}")
# 关闭引擎
await engine.dispose()
if __name__ == "__main__":
asyncio.run(main())
总结
- SQLAlchemy 2.0的异步支持通过
asyncio扩展实现,需要配合asyncpg或aiomysql驱动使用 - 使用
async_sessionmaker创建异步session工厂,所有数据库操作都必须用await调用 - 核心API(select/insert/update/delete)与同步版本保持一致,学习成本低
- 建议在FastAPI等异步框架中配合依赖注入使用,进一步提升开发效率
- 生产环境中注意连接池配置和事务边界管理