FastAPI Database Queries
Introduction
In web applications, interacting with databases is a fundamental requirement. FastAPI provides excellent integration with databases, primarily through SQLAlchemy, an SQL toolkit and Object-Relational Mapping (ORM) library for Python. This guide will walk you through performing various database queries in your FastAPI applications.
By the end of this tutorial, you'll understand how to:
- Set up database connections in FastAPI
- Perform CRUD operations (Create, Read, Update, Delete)
- Implement filtering, sorting, and pagination
- Handle database relationships
- Optimize your database queries
Prerequisites
Before we dive in, ensure you have:
- Basic knowledge of FastAPI
- Python 3.7+ installed
- Understanding of SQL fundamentals
- Familiarity with async programming concepts
Setting Up the Database Connection
First, let's set up a database connection using SQLAlchemy with FastAPI:
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
# Database URL
SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
# For PostgreSQL: "postgresql://user:password@localhost/dbname"
# Create SQLAlchemy engine
engine = create_engine(
    SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
# Create SessionLocal class
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Create Base class
Base = declarative_base()
app = FastAPI()
# Dependency to get DB session
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()
Defining Database Models
Let's create some models to work with:
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True, index=True)
    email = Column(String, unique=True, index=True)
    hashed_password = Column(String)
    is_active = Column(Boolean, default=True)
    items = relationship("Item", back_populates="owner")
class Item(Base):
    __tablename__ = "items"
    id = Column(Integer, primary_key=True, index=True)
    title = Column(String, index=True)
    description = Column(String, index=True)
    owner_id = Column(Integer, ForeignKey("users.id"))
    owner = relationship("User", back_populates="items")
Now, let's create the tables in the database:
# Create tables in the database
Base.metadata.create_all(bind=engine)
Basic CRUD Operations
Create Operation
Let's start with creating a new user:
from pydantic import BaseModel
# Pydantic model for request data validation
class UserCreate(BaseModel):
    email: str
    password: str
# Create a new user
@app.post("/users/")
def create_user(user: UserCreate, db: Session = Depends(get_db)):
    # Check if user already exists
    db_user = db.query(User).filter(User.email == user.email).first()
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    
    # Create a new User instance
    db_user = User(email=user.email, hashed_password=user.password)  # In real apps, hash the password!
    
    # Add to session and commit
    db.add(db_user)
    db.commit()
    db.refresh(db_user)  # Refresh to get generated id
    
    return db_user
Input:
{
  "email": "[email protected]",
  "password": "secretpassword"
}
Output:
{
  "id": 1,
  "email": "[email protected]",
  "is_active": true
}
Read Operations
Get a single user by ID
# Pydantic model for response
class User(BaseModel):
    id: int
    email: str
    is_active: bool
    
    class Config:
        orm_mode = True
@app.get("/users/{user_id}", response_model=User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = db.query(User).filter(User.id == user_id).first()
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user
Get all users
from typing import List
@app.get("/users/", response_model=List[User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = db.query(User).offset(skip).limit(limit).all()
    return users
Output:
[
  {
    "id": 1,
    "email": "[email protected]",
    "is_active": true
  },
  {
    "id": 2,
    "email": "[email protected]",
    "is_active": true
  }
]
Update Operation
class UserUpdate(BaseModel):
    email: str = None
    is_active: bool = None
@app.put("/users/{user_id}", response_model=User)
def update_user(user_id: int, user: UserUpdate, db: Session = Depends(get_db)):
    db_user = db.query(User).filter(User.id == user_id).first()
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    
    # Update user attributes
    for key, value in user.dict(exclude_unset=True).items():
        setattr(db_user, key, value)
    
    db.commit()
    db.refresh(db_user)
    return db_user
Input:
{
  "email": "[email protected]",
  "is_active": false
}
Output:
{
  "id": 1,
  "email": "[email protected]",
  "is_active": false
}
Delete Operation
@app.delete("/users/{user_id}")
def delete_user(user_id: int, db: Session = Depends(get_db)):
    db_user = db.query(User).filter(User.id == user_id).first()
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    
    db.delete(db_user)
    db.commit()
    return {"detail": "User deleted successfully"}
Output:
{
  "detail": "User deleted successfully"
}
Advanced Queries
Filtering
Let's implement a more advanced user search with multiple filters:
@app.get("/users/search/", response_model=List[User])
def search_users(
    email: str = None, 
    is_active: bool = None,
    skip: int = 0, 
    limit: int = 100, 
    db: Session = Depends(get_db)
):
    query = db.query(User)
    
    # Apply filters if provided
    if email:
        query = query.filter(User.email.contains(email))
    if is_active is not None:
        query = query.filter(User.is_active == is_active)
    
    users = query.offset(skip).limit(limit).all()
    return users
Sorting
Add sorting capabilities to the user listing endpoint:
from sqlalchemy import desc
@app.get("/users/sorted/", response_model=List[User])
def sorted_users(
    sort_by: str = "id",
    descending: bool = False,
    skip: int = 0, 
    limit: int = 100, 
    db: Session = Depends(get_db)
):
    # Validate sort_by field
    if sort_by not in ["id", "email", "is_active"]:
        sort_by = "id"  # Default sort field
    
    # Get sort column
    sort_column = getattr(User, sort_by)
    
    # Apply sorting direction
    if descending:
        sort_column = desc(sort_column)
    
    users = db.query(User).order_by(sort_column).offset(skip).limit(limit).all()
    return users
Pagination
We can enhance our pagination with more information about total counts:
from math import ceil
class PaginatedResponse(BaseModel):
    items: List[User]
    total: int
    page: int
    pages: int
    size: int
@app.get("/users/paginated/", response_model=PaginatedResponse)
def paginated_users(
    page: int = 1,
    size: int = 10,
    db: Session = Depends(get_db)
):
    # Ensure valid pagination parameters
    if page < 1:
        page = 1
    if size < 1:
        size = 10
        
    # Calculate offset
    skip = (page - 1) * size
    
    # Execute query
    users = db.query(User).offset(skip).limit(size).all()
    
    # Get total count for pagination info
    total = db.query(User).count()
    pages = ceil(total / size)
    
    return {
        "items": users,
        "total": total,
        "page": page,
        "pages": pages,
        "size": size
    }
Working with Relationships
Let's create an endpoint to get items for a specific user:
class Item(BaseModel):
    id: int
    title: str
    description: str
    
    class Config:
        orm_mode = True
@app.get("/users/{user_id}/items", response_model=List[Item])
def read_user_items(user_id: int, db: Session = Depends(get_db)):
    db_user = db.query(User).filter(User.id == user_id).first()
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    
    return db_user.items
And an endpoint to create an item for a user:
class ItemCreate(BaseModel):
    title: str
    description: str = None
@app.post("/users/{user_id}/items", response_model=Item)
def create_item_for_user(
    user_id: int, 
    item: ItemCreate, 
    db: Session = Depends(get_db)
):
    db_user = db.query(User).filter(User.id == user_id).first()
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    
    db_item = Item(title=item.title, description=item.description, owner_id=user_id)
    db.add(db_item)
    db.commit()
    db.refresh(db_item)
    
    return db_item
Query Optimization
Using Joins
When you need data from related tables, using joins can be more efficient:
class UserWithItems(User):
    items: List[Item]
@app.get("/users-with-items/{user_id}", response_model=UserWithItems)
def get_user_with_items(user_id: int, db: Session = Depends(get_db)):
    # Using join to load related data in one query
    db_user = db.query(User).options(
        joinedload(User.items)
    ).filter(User.id == user_id).first()
    
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    
    return db_user
Using Specific Column Selection
Sometimes you don't need all columns from a table:
from sqlalchemy.orm import load_only
@app.get("/users-email-only/")
def get_users_email_only(db: Session = Depends(get_db)):
    # Only select email column
    users = db.query(User).options(load_only(User.email)).all()
    return [{"email": user.email} for user in users]
Async Database Operations
For high-performance applications, you can use async database operations with FastAPI:
from fastapi import FastAPI
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
import asyncio
# Async database URL
ASYNC_DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"
# Create async engine
async_engine = create_async_engine(ASYNC_DATABASE_URL)
# Create async session
AsyncSessionLocal = sessionmaker(
    autocommit=False,
    autoflush=False,
    bind=async_engine,
    class_=AsyncSession,
)
async def get_async_db():
    async with AsyncSessionLocal() as session:
        try:
            yield session
        finally:
            await session.close()
@app.get("/async-users/")
async def get_users_async(db: AsyncSession = Depends(get_async_db)):
    result = await db.execute(select(User))
    users = result.scalars().all()
    return users
Real-World Example: Blog API
Let's create a more complete example of a blog API with users, posts, and comments:
# Models
class BlogUser(Base):
    __tablename__ = "blog_users"
    
    id = Column(Integer, primary_key=True, index=True)
    username = Column(String, unique=True, index=True)
    email = Column(String, unique=True, index=True)
    hashed_password = Column(String)
    is_admin = Column(Boolean, default=False)
    
    posts = relationship("BlogPost", back_populates="author")
    comments = relationship("Comment", back_populates="author")
class BlogPost(Base):
    __tablename__ = "blog_posts"
    
    id = Column(Integer, primary_key=True, index=True)
    title = Column(String, index=True)
    content = Column(String)
    published = Column(Boolean, default=True)
    created_at = Column(DateTime, default=datetime.utcnow)
    author_id = Column(Integer, ForeignKey("blog_users.id"))
    
    author = relationship("BlogUser", back_populates="posts")
    comments = relationship("Comment", back_populates="post")
class Comment(Base):
    __tablename__ = "comments"
    
    id = Column(Integer, primary_key=True, index=True)
    text = Column(String)
    created_at = Column(DateTime, default=datetime.utcnow)
    post_id = Column(Integer, ForeignKey("blog_posts.id"))
    author_id = Column(Integer, ForeignKey("blog_users.id"))
    
    post = relationship("BlogPost", back_populates="comments")
    author = relationship("BlogUser", back_populates="comments")
# Pydantic models
class CommentBase(BaseModel):
    text: str
class CommentCreate(CommentBase):
    pass
class Comment(CommentBase):
    id: int
    post_id: int
    author_id: int
    created_at: datetime
    
    class Config:
        orm_mode = True
class PostBase(BaseModel):
    title: str
    content: str
    published: bool = True
class PostCreate(PostBase):
    pass
class Post(PostBase):
    id: int
    author_id: int
    created_at: datetime
    comments: List[Comment] = []
    
    class Config:
        orm_mode = True
# API endpoints
@app.post("/blog/posts/", response_model=Post)
def create_post(post: PostCreate, user_id: int, db: Session = Depends(get_db)):
    db_user = db.query(BlogUser).filter(BlogUser.id == user_id).first()
    if not db_user:
        raise HTTPException(status_code=404, detail="User not found")
    
    db_post = BlogPost(**post.dict(), author_id=user_id)
    db.add(db_post)
    db.commit()
    db.refresh(db_post)
    return db_post
@app.get("/blog/posts/", response_model=List[Post])
def read_posts(
    skip: int = 0, 
    limit: int = 10, 
    published_only: bool = True,
    db: Session = Depends(get_db)
):
    query = db.query(BlogPost)
    
    if published_only:
        query = query.filter(BlogPost.published == True)
    
    posts = query.order_by(BlogPost.created_at.desc()).offset(skip).limit(limit).all()
    return posts
@app.get("/blog/posts/{post_id}", response_model=Post)
def read_post(post_id: int, db: Session = Depends(get_db)):
    post = db.query(BlogPost).filter(BlogPost.id == post_id).first()
    if post is None:
        raise HTTPException(status_code=404, detail="Post not found")
    return post
@app.post("/blog/posts/{post_id}/comments/", response_model=Comment)
def create_comment(
    post_id: int, 
    comment: CommentCreate, 
    user_id: int, 
    db: Session = Depends(get_db)
):
    post = db.query(BlogPost).filter(BlogPost.id == post_id).first()
    if not post:
        raise HTTPException(status_code=404, detail="Post not found")
    
    db_comment = Comment(**comment.dict(), post_id=post_id, author_id=user_id)
    db.add(db_comment)
    db.commit()
    db.refresh(db_comment)
    return db_comment
@app.get("/blog/search/", response_model=List[Post])
def search_posts(
    query: str, 
    skip: int = 0, 
    limit: int = 10, 
    db: Session = Depends(get_db)
):
    posts = db.query(BlogPost).filter(
        or_(
            BlogPost.title.contains(query),
            BlogPost.content.contains(query)
        )
    ).offset(skip).limit(limit).all()
    
    return posts
Summary
In this comprehensive guide, we've explored how to perform database queries in FastAPI applications using SQLAlchemy. We've covered:
- Setting up database connections
- Creating database models
- Performing CRUD operations:
- Creating records
- Reading records (single and multiple)
- Updating records
- Deleting records
 
- Implementing advanced queries with filtering, sorting, and pagination
- Working with relationships between models
- Optimizing queries
- Using async database operations
- Building a real-world blog API example
Database operations are fundamental to most web applications, and FastAPI's integration with SQLAlchemy provides a powerful and flexible way to work with databases. The combination of FastAPI's performance, SQLAlchemy's robust ORM capabilities, and Pydantic's data validation creates a solid foundation for building database-driven applications.
Additional Resources
To continue learning about FastAPI and databases:
- FastAPI Official Documentation on SQL Databases
- SQLAlchemy Documentation
- Alembic for Database Migrations
- Database Design Fundamentals
Exercises
- 
Basic Exercise: Create a simple todo application with FastAPI and SQLAlchemy that allows users to create, read, update, and delete tasks. 
- 
Intermediate Exercise: Extend the blog API example to include functionality for categories and tags for blog posts. 
- 
Advanced Exercise: Implement a complete user authentication system using FastAPI, SQLAlchemy, and JWT tokens, with role-based access control for different API endpoints. 
With these exercises, you'll gain hands-on experience with FastAPI database operations and deepen your understanding of how to build robust database-driven applications.
💡 Found a typo or mistake? Click "Edit this page" to suggest a correction. Your feedback is greatly appreciated!