Build an Authentication System with JWT (Step by Step)
Build a complete authentication system from scratch with FastAPI featuring user registration, login with JWT access and refresh tokens, bcrypt password hashing, protected API routes, logout with token blacklisting, and SQLite persistence.
What You’ll Build
You’ll build a production-ready auth API where users register with email/password, receive JWT access tokens (short-lived) and refresh tokens (long-lived), access protected endpoints, and securely log out with token blacklisting. This same auth architecture secures Doda Browser’s sync service and Durga Antivirus Pro’s license validation.
Why Build Auth from Scratch?
Authentication is the gateway to every application. Building it from scratch teaches you password hashing (bcrypt/argon2), JWT structure and signing, refresh token rotation, blacklisting strategies, and security headers — knowledge you need whether you’re using Auth0, Firebase, or a custom solution. You’ll understand exactly what third-party auth providers do behind the scenes.
Prerequisites
- Python 3.10+ installed
- Basic FastAPI knowledge
- Understanding of REST APIs
- Familiarity with JWT concepts
Step 1: Project Setup
mkdir auth-system
cd auth-system
python -m venv venv
source venv/bin/activate
pip install fastapi uvicorn python-jose[cryptography] passlib[bcrypt] python-multipart pydantic[email] sqlalchemy aiosqliteStep 2: Database Models
# database.py
from sqlalchemy import create_engine, Column, Integer, String, Boolean, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime, timezone
SQLALCHEMY_DATABASE_URL = "sqlite:///./auth.db"
engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True, nullable=False)
username = Column(String, unique=True, index=True, nullable=False)
hashed_password = Column(String, nullable=False)
is_active = Column(Boolean, default=True)
is_verified = Column(Boolean, default=False)
created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc))
class BlacklistedToken(Base):
__tablename__ = "blacklisted_tokens"
id = Column(Integer, primary_key=True, index=True)
token_jti = Column(String, unique=True, index=True, nullable=False)
expires_at = Column(DateTime, nullable=False)
blacklisted_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
Base.metadata.create_all(bind=engine)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()Step 3: Auth Utilities
# auth.py
from datetime import datetime, timedelta, timezone
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel, EmailStr
from typing import Optional
import uuid
SECRET_KEY = "your-secret-key-change-in-production-min-32-chars"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 15
REFRESH_TOKEN_EXPIRE_DAYS = 7
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def create_access_token(data: dict) -> str:
to_encode = data.copy()
now = datetime.now(timezone.utc)
expire = now + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({
"exp": expire,
"iat": now,
"jti": str(uuid.uuid4()),
"type": "access",
})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def create_refresh_token(data: dict) -> str:
to_encode = data.copy()
now = datetime.now(timezone.utc)
expire = now + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
to_encode.update({
"exp": expire,
"iat": now,
"jti": str(uuid.uuid4()),
"type": "refresh",
})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def decode_token(token: str) -> Optional[dict]:
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except JWTError:
return None
def get_token_jti(token: str) -> Optional[str]:
payload = decode_token(token)
return payload.get("jti") if payload else NoneStep 4: Pydantic Schemas
# schemas.py
from pydantic import BaseModel, EmailStr
from typing import Optional
from datetime import datetime
class UserCreate(BaseModel):
email: EmailStr
username: str
password: str
class UserLogin(BaseModel):
email: EmailStr
password: str
class UserResponse(BaseModel):
id: int
email: str
username: str
is_active: bool
created_at: datetime
class Config:
from_attributes = True
class TokenResponse(BaseModel):
access_token: str
refresh_token: str
token_type: str = "bearer"
class RefreshRequest(BaseModel):
refresh_token: str
class ChangePasswordRequest(BaseModel):
current_password: str
new_password: strStep 5: API Routes
# main.py
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm import Session
from datetime import datetime, timezone
from database import get_db, User, BlacklistedToken
from schemas import UserCreate, UserLogin, TokenResponse, RefreshRequest, UserResponse, ChangePasswordRequest
from auth import hash_password, verify_password, create_access_token, create_refresh_token, decode_token, get_token_jti
app = FastAPI(title="Auth API", version="1.0.0")
security = HTTPBearer()
def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db),
) -> User:
token = credentials.credentials
payload = decode_token(token)
if not payload:
raise HTTPException(status_code=401, detail="Invalid token")
if payload.get("type") != "access":
raise HTTPException(status_code=401, detail="Invalid token type")
jti = payload.get("jti")
blacklisted = db.query(BlacklistedToken).filter(BlacklistedToken.token_jti == jti).first()
if blacklisted:
raise HTTPException(status_code=401, detail="Token has been revoked")
user_id = payload.get("sub")
if not user_id:
raise HTTPException(status_code=401, detail="Invalid token payload")
user = db.query(User).filter(User.id == int(user_id)).first()
if not user or not user.is_active:
raise HTTPException(status_code=401, detail="User not found or inactive")
return user
@app.post("/auth/register", response_model=UserResponse, status_code=201)
def register(user_data: UserCreate, db: Session = Depends(get_db)):
existing = db.query(User).filter(
(User.email == user_data.email) | (User.username == user_data.username)
).first()
if existing:
raise HTTPException(status_code=409, detail="Email or username already registered")
user = User(
email=user_data.email,
username=user_data.username,
hashed_password=hash_password(user_data.password),
)
db.add(user)
db.commit()
db.refresh(user)
return user
@app.post("/auth/login", response_model=TokenResponse)
def login(login_data: UserLogin, db: Session = Depends(get_db)):
user = db.query(User).filter(User.email == login_data.email).first()
if not user or not verify_password(login_data.password, user.hashed_password):
raise HTTPException(status_code=401, detail="Invalid email or password")
token_data = {"sub": str(user.id), "email": user.email, "username": user.username}
return TokenResponse(
access_token=create_access_token(token_data),
refresh_token=create_refresh_token(token_data),
)
@app.post("/auth/refresh", response_model=TokenResponse)
def refresh_token(refresh_data: RefreshRequest, db: Session = Depends(get_db)):
payload = decode_token(refresh_data.refresh_token)
if not payload:
raise HTTPException(status_code=401, detail="Invalid refresh token")
if payload.get("type") != "refresh":
raise HTTPException(status_code=401, detail="Invalid token type")
jti = payload.get("jti")
blacklisted = db.query(BlacklistedToken).filter(BlacklistedToken.token_jti == jti).first()
if blacklisted:
raise HTTPException(status_code=401, detail="Refresh token has been revoked")
# Blacklist the used refresh token (rotation)
exp = datetime.fromtimestamp(payload["exp"], tz=timezone.utc)
db.add(BlacklistedToken(token_jti=jti, expires_at=exp))
db.commit()
user_id = payload.get("sub")
user = db.query(User).filter(User.id == int(user_id)).first()
if not user or not user.is_active:
raise HTTPException(status_code=401, detail="User not found or inactive")
token_data = {"sub": str(user.id), "email": user.email, "username": user.username}
return TokenResponse(
access_token=create_access_token(token_data),
refresh_token=create_refresh_token(token_data),
)
@app.post("/auth/logout")
def logout(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db),
):
access_payload = decode_token(credentials.credentials)
if access_payload:
jti = access_payload.get("jti")
exp = datetime.fromtimestamp(access_payload["exp"], tz=timezone.utc)
existing = db.query(BlacklistedToken).filter(BlacklistedToken.token_jti == jti).first()
if not existing:
db.add(BlacklistedToken(token_jti=jti, expires_at=exp))
db.commit()
return {"message": "Logged out successfully"}
@app.get("/auth/me", response_model=UserResponse)
def get_me(current_user: User = Depends(get_current_user)):
return current_user
@app.post("/auth/change-password")
def change_password(
req: ChangePasswordRequest,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
if not verify_password(req.current_password, current_user.hashed_password):
raise HTTPException(status_code=400, detail="Current password is incorrect")
current_user.hashed_password = hash_password(req.new_password)
db.commit()
return {"message": "Password changed successfully"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)Step 6: Run and Test
python main.pyExpected output:
INFO: Started server process [12345]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://0.0.0.0:8000Test the API with curl:
# Register
curl -X POST http://localhost:8000/auth/register \
-H "Content-Type: application/json" \
-d '{"email":"test@example.com","username":"testuser","password":"SecurePass123!"}'
# Expected output:
# {"id":1,"email":"test@example.com","username":"testuser","is_active":true,"created_at":"2026-06-20T..."}
# Login
curl -X POST http://localhost:8000/auth/login \
-H "Content-Type: application/json" \
-d '{"email":"test@example.com","password":"SecurePass123!"}'
# Expected output:
# {"access_token":"eyJhbGci...","refresh_token":"eyJhbGci...","token_type":"bearer"}
# Access protected endpoint (replace TOKEN with actual access token)
curl http://localhost:8000/auth/me \
-H "Authorization: Bearer eyJhbGci..."
# Expected output:
# {"id":1,"email":"test@example.com","username":"testuser","is_active":true,"created_at":"2026-06-20T..."}Architecture
sequenceDiagram
participant Client as Client App
participant API as FastAPI Auth API
participant DB as SQLite Database
Client->>API: POST /auth/register {email, username, password}
API->>DB: Check uniqueness
API->>DB: INSERT user (bcrypt hash)
API-->>Client: 201 {user data}
Client->>API: POST /auth/login {email, password}
API->>DB: SELECT user by email
API->>API: Verify password with bcrypt
API->>API: Generate access_token (15min) + refresh_token (7d)
API-->>Client: 200 {tokens}
Client->>API: GET /auth/me (Bearer access_token)
API->>DB: Check token not blacklisted
API->>API: Decode JWT, extract user_id
API->>DB: SELECT user by id
API-->>Client: 200 {user data}
Client->>API: POST /auth/refresh {refresh_token}
API->>DB: Blacklist old refresh token
API->>API: Generate new token pair
API-->>Client: 200 {new tokens}
Client->>API: POST /auth/logout (Bearer access_token)
API->>DB: INSERT token_jti into blacklist
API-->>Client: 200 {logged out}
Common Errors
1. “Not enough segments” JWT error
Your token string is missing parts. JWTs have three segments separated by dots: header.payload.signature. If you copied only part of the token or it was truncated, the full token is required. Ensure the Authorization header uses the format Bearer <full_token>.
2. bcrypt “Invalid salt” error
The hash_password and verify_password functions must use the same CryptContext instance. If you create separate instances in different modules, the hash formats might not match. Always import pwd_context from your auth module rather than creating a new one.
3. Token still works after logout Logout blacklists the access token’s JTI (JWT ID). But a new access token issued before the old one expires would still work. Token blacklisting is not retroactive — it only blocks the specific token that was blacklisted. For full logout security, keep the blacklist expiry short and rotate refresh tokens.
4. SQLite “database is locked” with concurrent requests
SQLite doesn’t handle high-concurrency writes well. For development it’s fine, but for production with multiple users, switch to PostgreSQL. The SQLAlchemy setup abstracts the database — just change the connection URL in database.py.
Practice Questions
1. Why are access tokens short-lived (15 min) and refresh tokens long-lived (7 days)? Short-lived access tokens limit the damage if one is stolen — it expires quickly. The refresh token, stored more securely (HTTP-only cookie, not localStorage), can request new access tokens without re-authentication. If a refresh token is compromised, it can be rotated (old one blacklisted, new one issued).
2. How does token blacklisting work?
The JTI (unique ID) of each token is stored in the blacklisted_tokens table when a user logs out. On every authenticated request, the server checks if the token’s JTI is in the blacklist. Expired blacklist entries are cleaned up by a background job to prevent table bloat.
3. What’s the difference between passlib’s bcrypt and Django’s PBKDF2?
bcrypt is adaptive — it includes a cost factor that makes hashing intentionally slow (tunable, default 12 rounds). PBKDF2 (used by Django) also has iterations. Both are resistant to GPU/ASIC cracking. bcrypt is preferred for new projects because it handles salt and cost in the hash string automatically.
4. Challenge: Add email verification
Add an is_verified boolean field. On registration, generate a signed token (JWT with email, 24h expiry) and send a verification link. Create a GET /auth/verify?token=... endpoint that marks the user as verified. Require is_verified=True before the user can access certain endpoints.
5. Challenge: Implement rate limiting
Add rate limiting to the login endpoint to prevent brute-force attacks. Use a Redis-backed rate limiter or an in-memory dictionary with timestamps per IP. Allow 5 login attempts per minute per IP. After 5 failures, return 429 Too Many Requests with a Retry-After header.
FAQ
Next Steps
- Add OAuth 2.0 social login with Google/GitHub
- Switch to PostgreSQL for production scalability
- Learn Docker to containerize the auth service
- Build the REST API project to extend the backend
Built by the developers of DodaTech
Doda Browser, DodaZIP & Durga Antivirus Pro