FastAPI Integration
AuthFort provides first-class FastAPI integration via routers and dependencies.
main.py
from contextlib import asynccontextmanager
from authfort import AuthFort, CookieConfig
from fastapi import FastAPI, Depends
auth = AuthFort(
database_url="sqlite+aiosqlite:///auth.db",
cookie=CookieConfig(),
)
@asynccontextmanager
async def lifespan(app):
yield
await auth.dispose()
app = FastAPI(lifespan=lifespan)
app.include_router(auth.fastapi_router(), prefix="/auth")
app.include_router(auth.jwks_router()) from contextlib import asynccontextmanager
from authfort import AuthFort, CookieConfig
from fastapi import FastAPI, Depends
auth = AuthFort(
database_url="sqlite+aiosqlite:///auth.db",
cookie=CookieConfig(),
)
@asynccontextmanager
async def lifespan(app):
yield
await auth.dispose()
app = FastAPI(lifespan=lifespan)
app.include_router(auth.fastapi_router(), prefix="/auth")
app.include_router(auth.jwks_router()) Before starting the app, run migrations once:
authfort migrate --database-url "sqlite+aiosqlite:///auth.db"This gives you:
| Method | Endpoint | Description |
|---|---|---|
| POST | /auth/signup | Create a new user |
| POST | /auth/login | Authenticate and get tokens |
| POST | /auth/refresh | Refresh access token |
| POST | /auth/logout | Revoke refresh token |
| GET | /auth/me | Get current user info |
| GET | /.well-known/jwks.json | Public signing keys |
Protecting Routes
Section titled “Protecting Routes”current_user
Section titled “current_user”Returns the authenticated user as a UserResponse with: id, email, name, email_verified, avatar_url, phone, banned, roles, created_at, and session_id.
from authfort import UserResponse
from fastapi import Depends
@app.get("/api/profile")
async def profile(user: UserResponse = Depends(auth.current_user)):
return {
"id": str(user.id),
"email": user.email,
"name": user.name,
"roles": user.roles,
"session_id": str(user.session_id),
} from authfort import UserResponse
from fastapi import Depends
@app.get("/api/profile")
async def profile(user: UserResponse = Depends(auth.current_user)):
return {
"id": str(user.id),
"email": user.email,
"name": user.name,
"roles": user.roles,
"session_id": str(user.session_id),
} If no valid token is found, returns 401 Unauthorized.
require_role
Section titled “require_role”Verifies the user has a specific role:
@app.get("/api/admin")
async def admin_panel(user: UserResponse = Depends(auth.require_role("admin"))):
return {"message": f"Welcome, {user.email}"} @app.get("/api/admin")
async def admin_panel(user: UserResponse = Depends(auth.require_role("admin"))):
return {"message": f"Welcome, {user.email}"} Returns 403 Forbidden if the user doesn’t have the required role.
Multiple Roles
Section titled “Multiple Roles”Pass a list to require any one of the roles:
@app.get("/api/manage")
async def manage(
user: UserResponse = Depends(auth.require_role(["admin", "editor"])),
):
return {"roles": user.roles} @app.get("/api/manage")
async def manage(
user: UserResponse = Depends(auth.require_role(["admin", "editor"])),
):
return {"roles": user.roles} Checking Roles in Code
Section titled “Checking Roles in Code”@app.get("/api/dashboard")
async def dashboard(user: UserResponse = Depends(auth.current_user)):
if "admin" in user.roles:
# Show admin-specific data
pass
return {"email": user.email, "roles": user.roles} @app.get("/api/dashboard")
async def dashboard(user: UserResponse = Depends(auth.current_user)):
if "admin" in user.roles:
# Show admin-specific data
pass
return {"email": user.email, "roles": user.roles} Common Patterns
Section titled “Common Patterns”Sign Out Other Devices
Section titled “Sign Out Other Devices”@app.post("/api/sign-out-others")
async def sign_out_others(user: UserResponse = Depends(auth.current_user)):
await auth.revoke_all_sessions(user.id, exclude=user.session_id)
return {"message": "All other sessions revoked"} @app.post("/api/sign-out-others")
async def sign_out_others(user: UserResponse = Depends(auth.current_user)):
await auth.revoke_all_sessions(user.id, exclude=user.session_id)
return {"message": "All other sessions revoked"} The session_id comes from the sid claim in the JWT.
Password Reset Flow
Section titled “Password Reset Flow”@app.post("/api/forgot-password")
async def forgot_password(email: str):
token = await auth.create_password_reset_token(email)
if token:
await send_email(
to=email,
subject="Reset your password",
body=f"https://myapp.com/reset?token={token}",
)
# Always return success to prevent enumeration
return {"message": "If that email exists, a reset link was sent."} @app.post("/api/forgot-password")
async def forgot_password(email: str):
token = await auth.create_password_reset_token(email)
if token:
await send_email(
to=email,
subject="Reset your password",
body=f"https://myapp.com/reset?token={token}",
)
# Always return success to prevent enumeration
return {"message": "If that email exists, a reset link was sent."} @app.post("/api/reset-password")
async def reset_password(token: str, new_password: str):
success = await auth.reset_password(token, new_password)
if not success:
raise HTTPException(400, "Invalid or expired token")
return {"message": "Password reset successfully"} @app.post("/api/reset-password")
async def reset_password(token: str, new_password: str):
success = await auth.reset_password(token, new_password)
if not success:
raise HTTPException(400, "Invalid or expired token")
return {"message": "Password reset successfully"} Change Password
Section titled “Change Password”@app.post("/api/change-password")
async def change_password(
old_password: str,
new_password: str,
user: UserResponse = Depends(auth.current_user),
):
await auth.change_password(user.id, old_password, new_password)
return {"message": "Password changed"} @app.post("/api/change-password")
async def change_password(
old_password: str,
new_password: str,
user: UserResponse = Depends(auth.current_user),
):
await auth.change_password(user.id, old_password, new_password)
return {"message": "Password changed"} Admin User Management
Section titled “Admin User Management”AuthFort provides programmatic methods for user management — build your own admin endpoints:
@app.get("/admin/users")async def admin_list_users( user=Depends(auth.require_role("admin")), query: str | None = None, limit: int = 20, offset: int = 0,): return await auth.list_users(query=query, limit=limit, offset=offset)
@app.get("/admin/users/{user_id}")async def admin_get_user( user_id: str, user=Depends(auth.require_role("admin")),): return await auth.get_user(user_id)
@app.delete("/admin/users/{user_id}")async def admin_delete_user( user_id: str, user=Depends(auth.require_role("admin")),): await auth.delete_user(user_id) return {"deleted": True}See User Management for filtering, pagination, and all available methods.