Skip to content
Build an Authentication System with JWT (Step by Step)

Build an Authentication System with JWT (Step by Step)

DodaTech Updated Jun 20, 2026 9 min read

Build a complete authentication system from scratch with FastAPI featuring user registration, login with JWT access and refresh tokens, bcrypt password hashing, protected API routes, logout with token blacklisting, and SQLite persistence.

What You’ll Build

You’ll build a production-ready auth API where users register with email/password, receive JWT access tokens (short-lived) and refresh tokens (long-lived), access protected endpoints, and securely log out with token blacklisting. This same auth architecture secures Doda Browser’s sync service and Durga Antivirus Pro’s license validation.

Why Build Auth from Scratch?

Authentication is the gateway to every application. Building it from scratch teaches you password hashing (bcrypt/argon2), JWT structure and signing, refresh token rotation, blacklisting strategies, and security headers — knowledge you need whether you’re using Auth0, Firebase, or a custom solution. You’ll understand exactly what third-party auth providers do behind the scenes.

Prerequisites

Step 1: Project Setup

mkdir auth-system
cd auth-system
python -m venv venv
source venv/bin/activate
pip install fastapi uvicorn python-jose[cryptography] passlib[bcrypt] python-multipart pydantic[email] sqlalchemy aiosqlite

Step 2: Database Models

# database.py
from sqlalchemy import create_engine, Column, Integer, String, Boolean, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime, timezone

SQLALCHEMY_DATABASE_URL = "sqlite:///./auth.db"
engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    email = Column(String, unique=True, index=True, nullable=False)
    username = Column(String, unique=True, index=True, nullable=False)
    hashed_password = Column(String, nullable=False)
    is_active = Column(Boolean, default=True)
    is_verified = Column(Boolean, default=False)
    created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
    updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc))

class BlacklistedToken(Base):
    __tablename__ = "blacklisted_tokens"

    id = Column(Integer, primary_key=True, index=True)
    token_jti = Column(String, unique=True, index=True, nullable=False)
    expires_at = Column(DateTime, nullable=False)
    blacklisted_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))

Base.metadata.create_all(bind=engine)

def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

Step 3: Auth Utilities

# auth.py
from datetime import datetime, timedelta, timezone
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel, EmailStr
from typing import Optional
import uuid

SECRET_KEY = "your-secret-key-change-in-production-min-32-chars"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 15
REFRESH_TOKEN_EXPIRE_DAYS = 7

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def hash_password(password: str) -> str:
    return pwd_context.hash(password)

def verify_password(plain_password: str, hashed_password: str) -> bool:
    return pwd_context.verify(plain_password, hashed_password)

def create_access_token(data: dict) -> str:
    to_encode = data.copy()
    now = datetime.now(timezone.utc)
    expire = now + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    to_encode.update({
        "exp": expire,
        "iat": now,
        "jti": str(uuid.uuid4()),
        "type": "access",
    })
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

def create_refresh_token(data: dict) -> str:
    to_encode = data.copy()
    now = datetime.now(timezone.utc)
    expire = now + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
    to_encode.update({
        "exp": expire,
        "iat": now,
        "jti": str(uuid.uuid4()),
        "type": "refresh",
    })
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

def decode_token(token: str) -> Optional[dict]:
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        return payload
    except JWTError:
        return None

def get_token_jti(token: str) -> Optional[str]:
    payload = decode_token(token)
    return payload.get("jti") if payload else None

Step 4: Pydantic Schemas

# schemas.py
from pydantic import BaseModel, EmailStr
from typing import Optional
from datetime import datetime

class UserCreate(BaseModel):
    email: EmailStr
    username: str
    password: str

class UserLogin(BaseModel):
    email: EmailStr
    password: str

class UserResponse(BaseModel):
    id: int
    email: str
    username: str
    is_active: bool
    created_at: datetime

    class Config:
        from_attributes = True

class TokenResponse(BaseModel):
    access_token: str
    refresh_token: str
    token_type: str = "bearer"

class RefreshRequest(BaseModel):
    refresh_token: str

class ChangePasswordRequest(BaseModel):
    current_password: str
    new_password: str

Step 5: API Routes

# main.py
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm import Session
from datetime import datetime, timezone

from database import get_db, User, BlacklistedToken
from schemas import UserCreate, UserLogin, TokenResponse, RefreshRequest, UserResponse, ChangePasswordRequest
from auth import hash_password, verify_password, create_access_token, create_refresh_token, decode_token, get_token_jti

app = FastAPI(title="Auth API", version="1.0.0")
security = HTTPBearer()

def get_current_user(
    credentials: HTTPAuthorizationCredentials = Depends(security),
    db: Session = Depends(get_db),
) -> User:
    token = credentials.credentials
    payload = decode_token(token)
    if not payload:
        raise HTTPException(status_code=401, detail="Invalid token")

    if payload.get("type") != "access":
        raise HTTPException(status_code=401, detail="Invalid token type")

    jti = payload.get("jti")
    blacklisted = db.query(BlacklistedToken).filter(BlacklistedToken.token_jti == jti).first()
    if blacklisted:
        raise HTTPException(status_code=401, detail="Token has been revoked")

    user_id = payload.get("sub")
    if not user_id:
        raise HTTPException(status_code=401, detail="Invalid token payload")

    user = db.query(User).filter(User.id == int(user_id)).first()
    if not user or not user.is_active:
        raise HTTPException(status_code=401, detail="User not found or inactive")

    return user

@app.post("/auth/register", response_model=UserResponse, status_code=201)
def register(user_data: UserCreate, db: Session = Depends(get_db)):
    existing = db.query(User).filter(
        (User.email == user_data.email) | (User.username == user_data.username)
    ).first()
    if existing:
        raise HTTPException(status_code=409, detail="Email or username already registered")

    user = User(
        email=user_data.email,
        username=user_data.username,
        hashed_password=hash_password(user_data.password),
    )
    db.add(user)
    db.commit()
    db.refresh(user)
    return user

@app.post("/auth/login", response_model=TokenResponse)
def login(login_data: UserLogin, db: Session = Depends(get_db)):
    user = db.query(User).filter(User.email == login_data.email).first()
    if not user or not verify_password(login_data.password, user.hashed_password):
        raise HTTPException(status_code=401, detail="Invalid email or password")

    token_data = {"sub": str(user.id), "email": user.email, "username": user.username}
    return TokenResponse(
        access_token=create_access_token(token_data),
        refresh_token=create_refresh_token(token_data),
    )

@app.post("/auth/refresh", response_model=TokenResponse)
def refresh_token(refresh_data: RefreshRequest, db: Session = Depends(get_db)):
    payload = decode_token(refresh_data.refresh_token)
    if not payload:
        raise HTTPException(status_code=401, detail="Invalid refresh token")

    if payload.get("type") != "refresh":
        raise HTTPException(status_code=401, detail="Invalid token type")

    jti = payload.get("jti")
    blacklisted = db.query(BlacklistedToken).filter(BlacklistedToken.token_jti == jti).first()
    if blacklisted:
        raise HTTPException(status_code=401, detail="Refresh token has been revoked")

    # Blacklist the used refresh token (rotation)
    exp = datetime.fromtimestamp(payload["exp"], tz=timezone.utc)
    db.add(BlacklistedToken(token_jti=jti, expires_at=exp))
    db.commit()

    user_id = payload.get("sub")
    user = db.query(User).filter(User.id == int(user_id)).first()
    if not user or not user.is_active:
        raise HTTPException(status_code=401, detail="User not found or inactive")

    token_data = {"sub": str(user.id), "email": user.email, "username": user.username}
    return TokenResponse(
        access_token=create_access_token(token_data),
        refresh_token=create_refresh_token(token_data),
    )

@app.post("/auth/logout")
def logout(
    credentials: HTTPAuthorizationCredentials = Depends(security),
    db: Session = Depends(get_db),
):
    access_payload = decode_token(credentials.credentials)
    if access_payload:
        jti = access_payload.get("jti")
        exp = datetime.fromtimestamp(access_payload["exp"], tz=timezone.utc)
        existing = db.query(BlacklistedToken).filter(BlacklistedToken.token_jti == jti).first()
        if not existing:
            db.add(BlacklistedToken(token_jti=jti, expires_at=exp))
            db.commit()

    return {"message": "Logged out successfully"}

@app.get("/auth/me", response_model=UserResponse)
def get_me(current_user: User = Depends(get_current_user)):
    return current_user

@app.post("/auth/change-password")
def change_password(
    req: ChangePasswordRequest,
    current_user: User = Depends(get_current_user),
    db: Session = Depends(get_db),
):
    if not verify_password(req.current_password, current_user.hashed_password):
        raise HTTPException(status_code=400, detail="Current password is incorrect")

    current_user.hashed_password = hash_password(req.new_password)
    db.commit()
    return {"message": "Password changed successfully"}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Step 6: Run and Test

python main.py

Expected output:

INFO:     Started server process [12345]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8000

Test the API with curl:

# Register
curl -X POST http://localhost:8000/auth/register \
  -H "Content-Type: application/json" \
  -d '{"email":"test@example.com","username":"testuser","password":"SecurePass123!"}'

# Expected output:
# {"id":1,"email":"test@example.com","username":"testuser","is_active":true,"created_at":"2026-06-20T..."}

# Login
curl -X POST http://localhost:8000/auth/login \
  -H "Content-Type: application/json" \
  -d '{"email":"test@example.com","password":"SecurePass123!"}'

# Expected output:
# {"access_token":"eyJhbGci...","refresh_token":"eyJhbGci...","token_type":"bearer"}

# Access protected endpoint (replace TOKEN with actual access token)
curl http://localhost:8000/auth/me \
  -H "Authorization: Bearer eyJhbGci..."

# Expected output:
# {"id":1,"email":"test@example.com","username":"testuser","is_active":true,"created_at":"2026-06-20T..."}

Architecture


sequenceDiagram
    participant Client as Client App
    participant API as FastAPI Auth API
    participant DB as SQLite Database

    Client->>API: POST /auth/register {email, username, password}
    API->>DB: Check uniqueness
    API->>DB: INSERT user (bcrypt hash)
    API-->>Client: 201 {user data}

    Client->>API: POST /auth/login {email, password}
    API->>DB: SELECT user by email
    API->>API: Verify password with bcrypt
    API->>API: Generate access_token (15min) + refresh_token (7d)
    API-->>Client: 200 {tokens}

    Client->>API: GET /auth/me (Bearer access_token)
    API->>DB: Check token not blacklisted
    API->>API: Decode JWT, extract user_id
    API->>DB: SELECT user by id
    API-->>Client: 200 {user data}

    Client->>API: POST /auth/refresh {refresh_token}
    API->>DB: Blacklist old refresh token
    API->>API: Generate new token pair
    API-->>Client: 200 {new tokens}

    Client->>API: POST /auth/logout (Bearer access_token)
    API->>DB: INSERT token_jti into blacklist
    API-->>Client: 200 {logged out}

Common Errors

1. “Not enough segments” JWT error Your token string is missing parts. JWTs have three segments separated by dots: header.payload.signature. If you copied only part of the token or it was truncated, the full token is required. Ensure the Authorization header uses the format Bearer <full_token>.

2. bcrypt “Invalid salt” error The hash_password and verify_password functions must use the same CryptContext instance. If you create separate instances in different modules, the hash formats might not match. Always import pwd_context from your auth module rather than creating a new one.

3. Token still works after logout Logout blacklists the access token’s JTI (JWT ID). But a new access token issued before the old one expires would still work. Token blacklisting is not retroactive — it only blocks the specific token that was blacklisted. For full logout security, keep the blacklist expiry short and rotate refresh tokens.

4. SQLite “database is locked” with concurrent requests SQLite doesn’t handle high-concurrency writes well. For development it’s fine, but for production with multiple users, switch to PostgreSQL. The SQLAlchemy setup abstracts the database — just change the connection URL in database.py.

Practice Questions

1. Why are access tokens short-lived (15 min) and refresh tokens long-lived (7 days)? Short-lived access tokens limit the damage if one is stolen — it expires quickly. The refresh token, stored more securely (HTTP-only cookie, not localStorage), can request new access tokens without re-authentication. If a refresh token is compromised, it can be rotated (old one blacklisted, new one issued).

2. How does token blacklisting work? The JTI (unique ID) of each token is stored in the blacklisted_tokens table when a user logs out. On every authenticated request, the server checks if the token’s JTI is in the blacklist. Expired blacklist entries are cleaned up by a background job to prevent table bloat.

3. What’s the difference between passlib’s bcrypt and Django’s PBKDF2? bcrypt is adaptive — it includes a cost factor that makes hashing intentionally slow (tunable, default 12 rounds). PBKDF2 (used by Django) also has iterations. Both are resistant to GPU/ASIC cracking. bcrypt is preferred for new projects because it handles salt and cost in the hash string automatically.

4. Challenge: Add email verification Add an is_verified boolean field. On registration, generate a signed token (JWT with email, 24h expiry) and send a verification link. Create a GET /auth/verify?token=... endpoint that marks the user as verified. Require is_verified=True before the user can access certain endpoints.

5. Challenge: Implement rate limiting Add rate limiting to the login endpoint to prevent brute-force attacks. Use a Redis-backed rate limiter or an in-memory dictionary with timestamps per IP. Allow 5 login attempts per minute per IP. After 5 failures, return 429 Too Many Requests with a Retry-After header.

FAQ

Should I store JWT tokens in localStorage or cookies?
Store access tokens in memory (variable), refresh tokens in HTTP-only cookies with Secure, SameSite=Strict, and HttpOnly flags. localStorage is accessible to JavaScript (XSS risk). Cookies with HttpOnly are not accessible to JS, preventing token theft via XSS attacks.
How do I handle token expiration on the client?
Intercept 401 responses in the client. When a request returns 401, try refreshing the token using the refresh token. If refresh succeeds, retry the original request. If refresh fails (refresh token expired), redirect to login. Axios interceptors or fetch wrappers handle this pattern cleanly.
How do I deploy this securely?
Set SECRET_KEY from an environment variable, not hardcoded. Use HTTPS (TLS) in production. Add security headers: Strict-Transport-Security, X-Content-Type-Options, X-Frame-Options. Switch to PostgreSQL for the database. Use a production ASGI server like Uvicorn behind NGINX.

Next Steps

  • Add OAuth 2.0 social login with Google/GitHub
  • Switch to PostgreSQL for production scalability
  • Learn Docker to containerize the auth service
  • Build the REST API project to extend the backend

Built by the developers of DodaTech

Doda Browser, DodaZIP & Durga Antivirus Pro