Skip to content

Events & Hooks

AuthFort emits events for every authentication action. Use hooks to react to these events — send emails, log audit trails, update analytics, sync with external systems.

@auth.on("user_created")
async def on_signup(event):
    await send_welcome_email(event.email)

@auth.on("login_failed")
async def on_failed_login(event):
    await log_failed_attempt(event.email, event.ip_address, event.reason)
@auth.on("user_created")
async def on_signup(event):
    await send_welcome_email(event.email)

@auth.on("login_failed")
async def on_failed_login(event):
    await log_failed_attempt(event.email, event.ip_address, event.reason)
async def on_signup(event):
    await send_welcome_email(event.email)

auth.add_hook("user_created", on_signup)
async def on_signup(event):
    await send_welcome_email(event.email)

auth.add_hook("user_created", on_signup)

Both approaches are equivalent. The decorator is syntactic sugar for add_hook().

Event types are importable from authfort.events for type-safe handlers:

from authfort import AuthFort
from authfort.events import UserCreated, LoginFailed, UserUpdated, UserDeleted, RateLimitExceeded

auth = AuthFort(database_url="...")

@auth.on("user_created")
async def on_signup(event: UserCreated):
    await send_welcome_email(event.email)

@auth.on("login_failed")
async def on_failed_login(event: LoginFailed):
    await log_failed_attempt(event.email, event.ip_address, event.reason)

@auth.on("user_updated")
async def on_profile_update(event: UserUpdated):
    await sync_profile(event.user_id, event.fields)
from authfort import AuthFort
from authfort.events import UserCreated, LoginFailed, UserUpdated, UserDeleted, RateLimitExceeded

auth = AuthFort(database_url="...")

@auth.on("user_created")
async def on_signup(event: UserCreated):
    await send_welcome_email(event.email)

@auth.on("login_failed")
async def on_failed_login(event: LoginFailed):
    await log_failed_attempt(event.email, event.ip_address, event.reason)

@auth.on("user_updated")
async def on_profile_update(event: UserUpdated):
    await sync_profile(event.user_id, event.fields)
EventPayload Fields
user_createduser_id, email, name, provider, timestamp
user_updateduser_id, fields, timestamp
user_deleteduser_id, email, timestamp
user_banneduser_id, timestamp
user_unbanneduser_id, timestamp
EventPayload Fields
loginuser_id, email, provider, ip_address, user_agent, timestamp
login_failedemail, reason, ip_address, user_agent, timestamp
logoutuser_id, timestamp
token_refresheduser_id, ip_address, user_agent, timestamp
EventPayload Fields
role_addeduser_id, role, timestamp
role_removeduser_id, role, timestamp
EventPayload Fields
session_revokeduser_id, session_id, revoke_all, timestamp
EventPayload Fields
oauth_linkuser_id, email, provider, timestamp
EventPayload Fields
password_reset_requesteduser_id, email, timestamp
password_resetuser_id, timestamp
password_changeduser_id, timestamp
EventPayload Fields
email_verification_requesteduser_id, email, token, timestamp
email_verifieduser_id, email, timestamp
EventPayload Fields
magic_link_requesteduser_id, email, token, timestamp
magic_link_loginuser_id, email, timestamp
email_otp_requesteduser_id, email, code, timestamp
email_otp_loginuser_id, email, timestamp
EventPayload Fields
key_rotatedold_kid, new_kid, timestamp
EventPayload Fields
rate_limit_exceededendpoint, ip_address, email, limit, key_type, timestamp
@auth.on("login")
async def audit_login(event):
    await db.insert("audit_log", {
        "action": "login",
        "user_id": event.user_id,
        "ip": event.ip_address,
        "user_agent": event.user_agent,
        "timestamp": event.timestamp,
    })
@auth.on("login")
async def audit_login(event):
    await db.insert("audit_log", {
        "action": "login",
        "user_id": event.user_id,
        "ip": event.ip_address,
        "user_agent": event.user_agent,
        "timestamp": event.timestamp,
    })
from collections import defaultdict

failed_attempts = defaultdict(int)

@auth.on("login_failed")
async def track_failures(event):
    failed_attempts[event.email] += 1
    if failed_attempts[event.email] >= 5:
        await notify_admin(f"Multiple failed logins for {event.email}")
from collections import defaultdict

failed_attempts = defaultdict(int)

@auth.on("login_failed")
async def track_failures(event):
    failed_attempts[event.email] += 1
    if failed_attempts[event.email] >= 5:
        await notify_admin(f"Multiple failed logins for {event.email}")
@auth.on("user_created")
async def welcome(event):
    if event.provider == "email":
        await send_welcome_email(event.email, event.name)
@auth.on("user_created")
async def welcome(event):
    if event.provider == "email":
        await send_welcome_email(event.email, event.name)
@auth.on("magic_link_requested")
async def send_magic_link(event):
    await send_email(
        to=event.email,
        subject="Your login link",
        body=f"https://myapp.com/auth/magic?token={event.token}",
    )
@auth.on("magic_link_requested")
async def send_magic_link(event):
    await send_email(
        to=event.email,
        subject="Your login link",
        body=f"https://myapp.com/auth/magic?token={event.token}",
    )

You can register multiple hooks for the same event. They run concurrently:

@auth.on("login")
async def hook_1(event):
    await update_last_login(event.user_id)

@auth.on("login")
async def hook_2(event):
    await send_login_notification(event.email)
@auth.on("login")
async def hook_1(event):
    await update_last_login(event.user_id)

@auth.on("login")
async def hook_2(event):
    await send_login_notification(event.email)