Skip to content

FastAPI Integration

AuthFort provides first-class FastAPI integration via routers and dependencies.

main.py
from contextlib import asynccontextmanager
from authfort import AuthFort, CookieConfig
from fastapi import FastAPI, Depends

auth = AuthFort(
    database_url="sqlite+aiosqlite:///auth.db",
    cookie=CookieConfig(),
)

@asynccontextmanager
async def lifespan(app):
    yield
    await auth.dispose()

app = FastAPI(lifespan=lifespan)
app.include_router(auth.fastapi_router(), prefix="/auth")
app.include_router(auth.jwks_router())
from contextlib import asynccontextmanager
from authfort import AuthFort, CookieConfig
from fastapi import FastAPI, Depends

auth = AuthFort(
    database_url="sqlite+aiosqlite:///auth.db",
    cookie=CookieConfig(),
)

@asynccontextmanager
async def lifespan(app):
    yield
    await auth.dispose()

app = FastAPI(lifespan=lifespan)
app.include_router(auth.fastapi_router(), prefix="/auth")
app.include_router(auth.jwks_router())

Before starting the app, run migrations once:

Terminal window
authfort migrate --database-url "sqlite+aiosqlite:///auth.db"

This gives you:

MethodEndpointDescription
POST/auth/signupCreate a new user
POST/auth/loginAuthenticate and get tokens
POST/auth/refreshRefresh access token
POST/auth/logoutRevoke refresh token
GET/auth/meGet current user info
GET/.well-known/jwks.jsonPublic signing keys

Returns the authenticated user as a UserResponse with: id, email, name, email_verified, avatar_url, phone, banned, roles, created_at, and session_id.

from authfort import UserResponse
from fastapi import Depends

@app.get("/api/profile")
async def profile(user: UserResponse = Depends(auth.current_user)):
    return {
        "id": str(user.id),
        "email": user.email,
        "name": user.name,
        "roles": user.roles,
        "session_id": str(user.session_id),
    }
from authfort import UserResponse
from fastapi import Depends

@app.get("/api/profile")
async def profile(user: UserResponse = Depends(auth.current_user)):
    return {
        "id": str(user.id),
        "email": user.email,
        "name": user.name,
        "roles": user.roles,
        "session_id": str(user.session_id),
    }

If no valid token is found, returns 401 Unauthorized.

Verifies the user has a specific role:

@app.get("/api/admin")
async def admin_panel(user: UserResponse = Depends(auth.require_role("admin"))):
    return {"message": f"Welcome, {user.email}"}
@app.get("/api/admin")
async def admin_panel(user: UserResponse = Depends(auth.require_role("admin"))):
    return {"message": f"Welcome, {user.email}"}

Returns 403 Forbidden if the user doesn’t have the required role.

Pass a list to require any one of the roles:

@app.get("/api/manage")
async def manage(
    user: UserResponse = Depends(auth.require_role(["admin", "editor"])),
):
    return {"roles": user.roles}
@app.get("/api/manage")
async def manage(
    user: UserResponse = Depends(auth.require_role(["admin", "editor"])),
):
    return {"roles": user.roles}
@app.get("/api/dashboard")
async def dashboard(user: UserResponse = Depends(auth.current_user)):
    if "admin" in user.roles:
        # Show admin-specific data
        pass
    return {"email": user.email, "roles": user.roles}
@app.get("/api/dashboard")
async def dashboard(user: UserResponse = Depends(auth.current_user)):
    if "admin" in user.roles:
        # Show admin-specific data
        pass
    return {"email": user.email, "roles": user.roles}
@app.post("/api/sign-out-others")
async def sign_out_others(user: UserResponse = Depends(auth.current_user)):
    await auth.revoke_all_sessions(user.id, exclude=user.session_id)
    return {"message": "All other sessions revoked"}
@app.post("/api/sign-out-others")
async def sign_out_others(user: UserResponse = Depends(auth.current_user)):
    await auth.revoke_all_sessions(user.id, exclude=user.session_id)
    return {"message": "All other sessions revoked"}

The session_id comes from the sid claim in the JWT.

@app.post("/api/forgot-password")
async def forgot_password(email: str):
    token = await auth.create_password_reset_token(email)
    if token:
        await send_email(
            to=email,
            subject="Reset your password",
            body=f"https://myapp.com/reset?token={token}",
        )
    # Always return success to prevent enumeration
    return {"message": "If that email exists, a reset link was sent."}
@app.post("/api/forgot-password")
async def forgot_password(email: str):
    token = await auth.create_password_reset_token(email)
    if token:
        await send_email(
            to=email,
            subject="Reset your password",
            body=f"https://myapp.com/reset?token={token}",
        )
    # Always return success to prevent enumeration
    return {"message": "If that email exists, a reset link was sent."}
@app.post("/api/reset-password")
async def reset_password(token: str, new_password: str):
    success = await auth.reset_password(token, new_password)
    if not success:
        raise HTTPException(400, "Invalid or expired token")
    return {"message": "Password reset successfully"}
@app.post("/api/reset-password")
async def reset_password(token: str, new_password: str):
    success = await auth.reset_password(token, new_password)
    if not success:
        raise HTTPException(400, "Invalid or expired token")
    return {"message": "Password reset successfully"}
@app.post("/api/change-password")
async def change_password(
    old_password: str,
    new_password: str,
    user: UserResponse = Depends(auth.current_user),
):
    await auth.change_password(user.id, old_password, new_password)
    return {"message": "Password changed"}
@app.post("/api/change-password")
async def change_password(
    old_password: str,
    new_password: str,
    user: UserResponse = Depends(auth.current_user),
):
    await auth.change_password(user.id, old_password, new_password)
    return {"message": "Password changed"}

AuthFort provides programmatic methods for user management — build your own admin endpoints:

@app.get("/admin/users")
async def admin_list_users(
user=Depends(auth.require_role("admin")),
query: str | None = None,
limit: int = 20,
offset: int = 0,
):
return await auth.list_users(query=query, limit=limit, offset=offset)
@app.get("/admin/users/{user_id}")
async def admin_get_user(
user_id: str,
user=Depends(auth.require_role("admin")),
):
return await auth.get_user(user_id)
@app.delete("/admin/users/{user_id}")
async def admin_delete_user(
user_id: str,
user=Depends(auth.require_role("admin")),
):
await auth.delete_user(user_id)
return {"deleted": True}

See User Management for filtering, pagination, and all available methods.