FastAPI Task Monitoring
In production applications, executing tasks in the background is only half the battle. Knowing the status of those tasks, troubleshooting failures, and understanding performance metrics is equally important. In this tutorial, we'll explore how to implement effective monitoring for your FastAPI background tasks.
Why Monitor Background Tasks?
Background tasks operate asynchronously, detached from the main request flow. This separation provides several advantages but makes it challenging to know:
- Has the task started?
- Is it still running?
- Did it complete successfully?
- How long did it take?
- What went wrong if it failed?
Proper monitoring answers these questions and helps maintain reliable services.
Basic Logging for Background Tasks
Let's start with a simple approach - implementing logging in your background tasks.
import logging
import time
from fastapi import FastAPI, BackgroundTasks
# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
app = FastAPI()
def process_data(item_id: int):
    logger.info(f"Starting background task for item {item_id}")
    try:
        # Simulate processing time
        time.sleep(5)
        logger.info(f"Background task for item {item_id} completed successfully")
    except Exception as e:
        logger.error(f"Background task for item {item_id} failed: {str(e)}")
        raise
@app.post("/items/{item_id}/process")
async def process_item(item_id: int, background_tasks: BackgroundTasks):
    background_tasks.add_task(process_data, item_id)
    return {"message": "Item processing started"}
This basic approach provides visibility through logs but lacks structured tracking for multiple tasks.
Creating a Task Registry
To get better insights, let's implement a simple task registry to track task status:
import time
import uuid
from datetime import datetime
from enum import Enum
from typing import Dict, Any
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
app = FastAPI()
class TaskStatus(str, Enum):
    PENDING = "pending"
    RUNNING = "running" 
    COMPLETED = "completed"
    FAILED = "failed"
class TaskInfo(BaseModel):
    id: str
    status: TaskStatus
    created_at: datetime
    updated_at: datetime
    result: Any = None
    error: str = None
# Our in-memory task registry
task_registry: Dict[str, TaskInfo] = {}
def run_task(task_id: str, seconds: int):
    # Update status to running
    update_task_status(task_id, TaskStatus.RUNNING)
    
    try:
        # Simulate work
        time.sleep(seconds)
        
        # Update task with results
        update_task_status(
            task_id, 
            TaskStatus.COMPLETED,
            result=f"Task completed after {seconds} seconds"
        )
    except Exception as e:
        # Update task with error
        update_task_status(
            task_id,
            TaskStatus.FAILED,
            error=str(e)
        )
def update_task_status(task_id: str, status: TaskStatus, result=None, error=None):
    task = task_registry.get(task_id)
    if task:
        task.status = status
        task.updated_at = datetime.now()
        if result is not None:
            task.result = result
        if error is not None:
            task.error = error
@app.post("/tasks/", response_model=TaskInfo)
async def create_task(background_tasks: BackgroundTasks, seconds: int = 10):
    # Create a unique ID for this task
    task_id = str(uuid.uuid4())
    
    # Register the task
    task_info = TaskInfo(
        id=task_id,
        status=TaskStatus.PENDING,
        created_at=datetime.now(),
        updated_at=datetime.now()
    )
    task_registry[task_id] = task_info
    
    # Start the background task
    background_tasks.add_task(run_task, task_id, seconds)
    
    return task_info
@app.get("/tasks/{task_id}", response_model=TaskInfo)
async def get_task(task_id: str):
    if task_id not in task_registry:
        raise HTTPException(status_code=404, detail="Task not found")
    return task_registry[task_id]
@app.get("/tasks/")
async def list_tasks():
    return list(task_registry.values())
With this implementation:
- Each task gets a unique ID
- Tasks progress through defined states (pending, running, completed, failed)
- We track creation and update times
- We capture results or error messages
- We can query task status via API endpoints
Adding Progress Tracking
For long-running tasks, tracking progress percentage can be valuable:
class TaskInfo(BaseModel):
    id: str
    status: TaskStatus
    created_at: datetime
    updated_at: datetime
    progress: float = 0.0  # Progress from 0 to 100
    result: Any = None
    error: str = None
def process_large_data(task_id: str, items: int):
    update_task_status(task_id, TaskStatus.RUNNING)
    
    try:
        for i in range(items):
            # Do some work
            time.sleep(0.5)
            
            # Update progress
            progress = (i + 1) / items * 100
            update_task_status(
                task_id, 
                TaskStatus.RUNNING,
                progress=progress
            )
        
        update_task_status(
            task_id, 
            TaskStatus.COMPLETED,
            progress=100.0,
            result=f"Processed {items} items successfully"
        )
    except Exception as e:
        update_task_status(
            task_id,
            TaskStatus.FAILED,
            error=str(e)
        )
Implementing a More Robust Solution
For production use, consider these enhancements:
1. Using Redis for Task Storage
In-memory storage isn't persistent. Redis provides a better solution for task tracking:
import redis
import json
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def store_task_info(task_id: str, task_info: dict):
    redis_client.set(f"task:{task_id}", json.dumps(task_info))
    # Set expiration time (e.g., keep task info for 24 hours)
    redis_client.expire(f"task:{task_id}", 86400)
def get_task_info(task_id: str):
    data = redis_client.get(f"task:{task_id}")
    if data:
        return json.loads(data)
    return None
def run_background_task(task_id: str):
    # Update task to running
    task_info = get_task_info(task_id)
    task_info["status"] = "running"
    task_info["updated_at"] = str(datetime.now())
    store_task_info(task_id, task_info)
    
    try:
        # Do work...
        time.sleep(10)
        
        # Update to completed
        task_info = get_task_info(task_id)
        task_info["status"] = "completed"
        task_info["updated_at"] = str(datetime.now())
        task_info["result"] = "Task completed successfully"
        store_task_info(task_id, task_info)
    except Exception as e:
        # Update with error
        task_info = get_task_info(task_id)
        task_info["status"] = "failed"
        task_info["updated_at"] = str(datetime.now())
        task_info["error"] = str(e)
        store_task_info(task_id, task_info)
2. Using Celery for Advanced Monitoring
For even more advanced monitoring, consider integrating Celery with FastAPI:
from fastapi import FastAPI
from celery import Celery
from celery.result import AsyncResult
# Configure Celery
celery_app = Celery(
    'tasks',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/0'
)
app = FastAPI()
@celery_app.task
def process_data(data):
    # Long-running process here
    return {"result": "Data processed successfully"}
@app.post("/process/")
async def start_process(data: dict):
    # Start a Celery task
    task = process_data.delay(data)
    return {"task_id": task.id}
@app.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
    # Get task status from Celery
    task_result = AsyncResult(task_id, app=celery_app)
    
    # Return task status information
    result = {
        "task_id": task_id,
        "status": task_result.status
    }
    
    # Include result or error if available
    if task_result.ready():
        if task_result.successful():
            result["result"] = task_result.get()
        else:
            # If the task failed, get the exception info
            result["error"] = str(task_result.result)
    
    return result
Celery provides a comprehensive task monitoring system including:
- Task status tracking
- Result storage
- Automatic retries
- Error handling
- Task inspection and statistics
Implementing a WebSocket for Real-time Monitoring
For real-time monitoring, we can use WebSockets to push task updates to clients:
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
app = FastAPI()
class ConnectionManager:
    def __init__(self):
        self.active_connections: Dict[str, List[WebSocket]] = {}
    async def connect(self, websocket: WebSocket, task_id: str):
        await websocket.accept()
        if task_id not in self.active_connections:
            self.active_connections[task_id] = []
        self.active_connections[task_id].append(websocket)
    def disconnect(self, websocket: WebSocket, task_id: str):
        if task_id in self.active_connections:
            self.active_connections[task_id].remove(websocket)
    async def send_task_update(self, task_id: str, data: dict):
        if task_id in self.active_connections:
            for connection in self.active_connections[task_id]:
                await connection.send_json(data)
manager = ConnectionManager()
# When task status changes, notify connected clients
def update_task_status(task_id: str, status: TaskStatus, **kwargs):
    task = task_registry.get(task_id)
    if task:
        task.status = status
        task.updated_at = datetime.now()
        
        # Update other fields
        for key, value in kwargs.items():
            if hasattr(task, key):
                setattr(task, key, value)
        
        # Notify clients about the update
        asyncio.create_task(manager.send_task_update(
            task_id, 
            task.dict()
        ))
@app.websocket("/ws/tasks/{task_id}")
async def websocket_task_endpoint(websocket: WebSocket, task_id: str):
    await manager.connect(websocket, task_id)
    try:
        # Send initial task status
        if task_id in task_registry:
            await websocket.send_json(task_registry[task_id].dict())
        
        # Keep connection alive
        while True:
            # Wait for any message from client to keep connection alive
            await websocket.receive_text()
    except WebSocketDisconnect:
        manager.disconnect(websocket, task_id)
Best Practices for Task Monitoring
- 
Use unique task IDs: Each task should have a unique identifier for tracking. 
- 
Include timestamps: Track when tasks are created, started, and completed. 
- 
Implement structured logging: Use a consistent format for easy parsing. 
- 
Add progress reporting: For long-running tasks, report percentage completion. 
- 
Store task history: Keep a record of past tasks for analysis. 
- 
Set up alerts: Get notified when tasks fail or take too long. 
- 
Implement timeout handling: Prevent tasks from running indefinitely. 
- 
Add resource usage monitoring: Track memory and CPU usage. 
- 
Create a monitoring dashboard: Visualize task performance and status. 
- 
Clean up completed tasks: Implement a retention policy to clear old task data. 
Summary
Monitoring background tasks is essential for maintaining robust FastAPI applications. In this tutorial, we've explored:
- Basic logging for background tasks
- Creating a task registry for status tracking
- Implementing progress reporting
- Using Redis for persistent task storage
- Leveraging Celery for advanced monitoring
- Setting up WebSockets for real-time monitoring
By implementing proper monitoring, you'll gain visibility into your background processes, enabling faster debugging and more reliable applications.
Further Resources
- FastAPI official documentation on background tasks
- Celery documentation
- Redis documentation
- Prometheus for metrics collection
- Grafana for visualization
Exercises
- Enhance the task registry to include execution time metrics.
- Create a simple web dashboard that displays active tasks and their status.
- Implement a notification system that alerts when tasks fail.
- Add the ability to cancel running background tasks.
- Integrate with Prometheus to collect metrics on task performance.
💡 Found a typo or mistake? Click "Edit this page" to suggest a correction. Your feedback is greatly appreciated!