Build an Image Processing Service with Python (Step by Step)
Build an image processing service with FastAPI and Pillow featuring image upload, resize, crop, rotate, blur/sharpen/grayscale filters, watermark overlay, async processing with background tasks, automatic thumbnail generation, and format conversion between JPEG, PNG, and WebP.
What You’ll Build
You’ll build a RESTful image processing API where clients upload images, apply transformations (resize, crop, rotate, filters, watermark), and receive processed results. The service uses background tasks for async processing, generates thumbnails automatically, and supports format conversion. DodaZIP uses this same image processing pipeline for its batch image converter.
Why Build an Image Processing Service?
Image processing is a core requirement for social media apps, e-commerce platforms, content management systems, and SaaS products. Building one teaches you file upload handling, image manipulation algorithms, async task processing, caching strategies, and REST API design for binary data — all skills you’ll use in photo sharing apps, profile picture uploaders, and document scanners.
Prerequisites
Step 1: Project Setup
mkdir image-processor
cd image-processor
python -m venv venv
source venv/bin/activate
pip install fastapi uvicorn pillow python-multipart aiofilesStep 2: Image Processing Engine
# processor.py
from PIL import Image, ImageFilter, ImageDraw, ImageFont
import io
import os
from typing import Optional, Tuple
class ImageProcessingError(Exception):
pass
SUPPORTED_FORMATS = {"JPEG": "image/jpeg", "PNG": "image/png", "WEBP": "image/webp"}
def open_image(data: bytes) -> Image.Image:
try:
img = Image.open(io.BytesIO(data))
img.load()
return img
except Exception as e:
raise ImageProcessingError(f"Cannot open image: {e}")
def resize_image(img: Image.Image, width: int, height: int, maintain_aspect: bool = True) -> Image.Image:
if maintain_aspect:
img.thumbnail((width, height), Image.LANCZOS)
else:
img = img.resize((width, height), Image.LANCZOS)
return img
def crop_image(img: Image.Image, x: int, y: int, width: int, height: int) -> Image.Image:
if x < 0 or y < 0 or width <= 0 or height <= 0:
raise ImageProcessingError("Crop dimensions must be positive")
if x + width > img.width or y + height > img.height:
raise ImageProcessingError("Crop area exceeds image boundaries")
return img.crop((x, y, x + width, y + height))
def rotate_image(img: Image.Image, degrees: float, expand: bool = True) -> Image.Image:
return img.rotate(degrees, expand=expand, resample=Image.BICUBIC)
def apply_filter(img: Image.Image, filter_name: str) -> Image.Image:
filters = {
"blur": ImageFilter.BLUR,
"sharpen": ImageFilter.SHARPEN,
"contour": ImageFilter.CONTOUR,
"emboss": ImageFilter.EMBOSS,
"edge_enhance": ImageFilter.EDGE_ENHANCE,
"smooth": ImageFilter.SMOOTH,
}
if filter_name in filters:
return img.filter(filters[filter_name])
elif filter_name == "grayscale":
return img.convert("L").convert("RGB")
else:
raise ImageProcessingError(f"Unknown filter: {filter_name}. Supported: {list(filters.keys()) + ['grayscale']}")
def apply_gaussian_blur(img: Image.Image, radius: float = 2.0) -> Image.Image:
return img.filter(ImageFilter.GaussianBlur(radius=radius))
def add_watermark(
img: Image.Image,
text: str = "DodaTech",
position: str = "bottom-right",
opacity: int = 128,
) -> Image.Image:
overlay = img.copy()
draw = ImageDraw.Draw(overlay, "RGBA")
try:
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 36)
except (IOError, OSError):
font = ImageFont.load_default()
bbox = draw.textbbox((0, 0), text, font=font)
text_w = bbox[2] - bbox[0]
text_h = bbox[3] - bbox[1]
padding = 20
positions = {
"top-left": (padding, padding),
"top-right": (img.width - text_w - padding, padding),
"bottom-left": (padding, img.height - text_h - padding),
"bottom-right": (img.width - text_w - padding, img.height - text_h - padding),
"center": ((img.width - text_w) // 2, (img.height - text_h) // 2),
}
pos = positions.get(position, positions["bottom-right"])
draw.text(pos, text, font=font, fill=(255, 255, 255, opacity))
return Image.alpha_composite(img.convert("RGBA"), overlay)
def convert_format(img: Image.Image, output_format: str) -> Image.Image:
output_format = output_format.upper()
if output_format not in SUPPORTED_FORMATS:
raise ImageProcessingError(f"Unsupported format: {output_format}. Supported: {list(SUPPORTED_FORMATS.keys())}")
if output_format == "JPEG" and img.mode in ("RGBA", "P", "LA"):
background = Image.new("RGB", img.size, (255, 255, 255))
if img.mode == "RGBA":
background.paste(img, mask=img.split()[3])
else:
background.paste(img)
img = background
elif output_format == "WEBP" and img.mode == "P":
img = img.convert("RGBA")
return img
def save_image(img: Image.Image, output_format: str, quality: int = 85) -> bytes:
output = io.BytesIO()
img.save(output, format=output_format.upper(), quality=quality)
return output.getvalue()
def generate_thumbnail(data: bytes, size: Tuple[int, int] = (150, 150)) -> bytes:
img = open_image(data)
img.thumbnail(size, Image.LANCZOS)
fmt = img.format or "JPEG"
return save_image(img, fmt)Step 3: FastAPI Application
# main.py
from fastapi import FastAPI, UploadFile, File, Form, BackgroundTasks, HTTPException
from fastapi.responses import Response, JSONResponse
from fastapi.staticfiles import StaticFiles
import os
import uuid
import aiofiles
from pathlib import Path
from typing import Optional
from processor import (
open_image, resize_image, crop_image, rotate_image,
apply_filter, apply_gaussian_blur, add_watermark,
convert_format, save_image, generate_thumbnail,
ImageProcessingError, SUPPORTED_FORMATS,
)
app = FastAPI(title="Image Processing Service", version="1.0.0")
UPLOAD_DIR = Path("uploads")
THUMBNAIL_DIR = Path("thumbnails")
UPLOAD_DIR.mkdir(exist_ok=True)
THUMBNAIL_DIR.mkdir(exist_ok=True)
ALLOWED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".webp", ".gif", ".bmp", ".tiff"}
MAX_FILE_SIZE = 20 * 1024 * 1024 # 20MB
async def save_upload(file: UploadFile) -> tuple[str, bytes]:
content = await file.read()
if len(content) > MAX_FILE_SIZE:
raise HTTPException(status_code=413, detail="File too large (max 20MB)")
ext = Path(file.filename).suffix.lower()
if ext not in ALLOWED_EXTENSIONS:
raise HTTPException(status_code=400, detail=f"Unsupported file type: {ext}")
file_id = str(uuid.uuid4())
filepath = UPLOAD_DIR / f"{file_id}{ext}"
async with aiofiles.open(filepath, "wb") as f:
await f.write(content)
return file_id, content
@app.post("/upload")
async def upload_image(file: UploadFile = File(...)):
file_id, content = await save_upload(file)
return {"file_id": file_id, "filename": file.filename, "size": len(content)}
@app.post("/process/{file_id}")
async def process_image(
file_id: str,
background_tasks: BackgroundTasks,
resize_width: Optional[int] = Form(None),
resize_height: Optional[int] = Form(None),
crop_x: Optional[int] = Form(None),
crop_y: Optional[int] = Form(None),
crop_width: Optional[int] = Form(None),
crop_height: Optional[int] = Form(None),
rotate_degrees: Optional[float] = Form(None),
filter_name: Optional[str] = Form(None),
blur_radius: Optional[float] = Form(None),
watermark_text: Optional[str] = Form(None),
watermark_position: Optional[str] = Form("bottom-right"),
watermark_opacity: Optional[int] = Form(128),
output_format: Optional[str] = Form(None),
quality: Optional[int] = Form(85),
):
# Find the uploaded file
upload_files = list(UPLOAD_DIR.glob(f"{file_id}.*"))
if not upload_files:
raise HTTPException(status_code=404, detail="File not found")
filepath = upload_files[0]
data = filepath.read_bytes()
img = open_image(data)
original_format = img.format or "JPEG"
try:
# Apply transformations in order
if crop_x is not None and crop_y is not None and crop_width is not None and crop_height is not None:
img = crop_image(img, crop_x, crop_y, crop_width, crop_height)
if resize_width is not None and resize_height is not None:
img = resize_image(img, resize_width, resize_height)
if rotate_degrees is not None:
img = rotate_image(img, rotate_degrees)
if filter_name:
img = apply_filter(img, filter_name)
if blur_radius is not None:
img = apply_gaussian_blur(img, blur_radius)
if watermark_text:
img = add_watermark(img, watermark_text, watermark_position, watermark_opacity)
fmt = (output_format or original_format).upper()
img = convert_format(img, fmt)
result_data = save_image(img, fmt, quality=quality)
# Generate thumbnail in background
background_tasks.add_task(
generate_and_save_thumbnail, data, file_id
)
mime = SUPPORTED_FORMATS.get(fmt, "application/octet-stream")
filename = f"processed_{file_id}.{fmt.lower()}"
return Response(content=result_data, media_type=mime, headers={
"Content-Disposition": f'attachment; filename="{filename}"',
})
except ImageProcessingError as e:
raise HTTPException(status_code=400, detail=str(e))
async def generate_and_save_thumbnail(data: bytes, file_id: str):
try:
thumb_data = generate_thumbnail(data)
thumb_path = THUMBNAIL_DIR / f"{file_id}_thumb.jpg"
async with aiofiles.open(thumb_path, "wb") as f:
await f.write(thumb_data)
except Exception as e:
print(f"Thumbnail generation failed: {e}")
@app.get("/thumbnail/{file_id}")
async def get_thumbnail(file_id: str):
thumb_path = THUMBNAIL_DIR / f"{file_id}_thumb.jpg"
if not thumb_path.exists():
upload_files = list(UPLOAD_DIR.glob(f"{file_id}.*"))
if not upload_files:
raise HTTPException(status_code=404, detail="File not found")
data = upload_files[0].read_bytes()
thumb_data = generate_thumbnail(data)
async with aiofiles.open(thumb_path, "wb") as f:
await f.write(thumb_data)
return Response(content=thumb_data, media_type="image/jpeg")
thumb_data = thumb_path.read_bytes()
return Response(content=thumb_data, media_type="image/jpeg")
@app.get("/info/{file_id}")
async def get_image_info(file_id: str):
upload_files = list(UPLOAD_DIR.glob(f"{file_id}.*"))
if not upload_files:
raise HTTPException(status_code=404, detail="File not found")
img = open_image(upload_files[0].read_bytes())
return {
"file_id": file_id,
"filename": upload_files[0].name,
"format": img.format,
"mode": img.mode,
"width": img.width,
"height": img.height,
"size_bytes": upload_files[0].stat().st_size,
}
@app.delete("/upload/{file_id}")
async def delete_upload(file_id: str):
upload_files = list(UPLOAD_DIR.glob(f"{file_id}.*"))
thumbnail_files = list(THUMBNAIL_DIR.glob(f"{file_id}_thumb.*"))
deleted = 0
for f in upload_files + thumbnail_files:
f.unlink()
deleted += 1
if deleted == 0:
raise HTTPException(status_code=404, detail="File not found")
return {"deleted": deleted}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)Step 4: Run and Test
python main.pyExpected output:
INFO: Started server process [12345]
INFO: Uvicorn running on http://0.0.0.0:8000Test with curl:
# Upload
curl -X POST http://localhost:8000/upload \
-F "file=@test_image.jpg"
# Expected:
# {"file_id":"abc123...","filename":"test_image.jpg","size":..."
# Process: resize to 300x300, grayscale, output PNG
curl -X POST http://localhost:8000/process/abc123... \
-F "resize_width=300" \
-F "resize_height=300" \
-F "filter_name=grayscale" \
-F "output_format=PNG" \
--output processed.png
# Process: add watermark
curl -X POST http://localhost:8000/process/abc123... \
-F "watermark_text=DodaTech" \
-F "watermark_position=bottom-right" \
--output watermarked.jpg
# Get image info
curl http://localhost:8000/info/abc123...
# Expected:
# {"file_id":"abc123...","format":"JPEG","width":1920,"height":1080,...}
# Get thumbnail
curl http://localhost:8000/thumbnail/abc123... --output thumb.jpgArchitecture
sequenceDiagram
participant Client as Client App
participant API as FastAPI Service
participant Proc as Image Processor
participant Storage as File System
Client->>API: POST /upload (multipart file)
API->>Storage: Save file with UUID
API-->>Client: {file_id, filename, size}
Client->>API: POST /process/{file_id} (transform params)
API->>Storage: Read original file
API->>Proc: Apply transformations sequentially
Proc->>Proc: crop → resize → rotate → filter → watermark
Proc->>Proc: Convert format, save to bytes
API->>API: Queue thumbnail generation (background)
API-->>Client: Response with processed image bytes
Client->>API: GET /thumbnail/{file_id}
API->>Storage: Check for cached thumbnail
alt Cached thumbnail exists
API-->>Client: Return thumbnail bytes
else No cached thumbnail
API->>Storage: Read original
API->>Proc: Generate thumbnail
API->>Storage: Cache thumbnail
API-->>Client: Return thumbnail bytes
end
Common Errors
1. “Cannot open image” for seemingly valid files
Pillow opens files based on content (magic bytes), not extension. A .jpg file saved with wrong encoding (e.g., CMYK JPEG, progressive JPEG with incompatible libjpeg) can fail. Convert the file with a known-good tool first. Check Pillow’s supported format list — some formats require optional system libraries.
2. Watermark text appears cut off at edges The watermark position calculation uses text bounding box but doesn’t account for the text exceeding image boundaries after rotation. If the image is small (<100px), the watermark text will overflow. Add a minimum image size check for watermarking, or scale the font size relative to image dimensions.
3. Background task fails silently FastAPI background tasks don’t return errors to the client. If thumbnail generation fails, you’ll see nothing. Always wrap background task logic in try/except with logging. Add a health check endpoint that verifies thumbnail storage is writable.
4. JPEG output has pink/green tint
This happens when converting RGBA (with transparency) to JPEG. Our convert_format function handles this by compositing onto a white background. If the image has transparency and the target format is JPEG, ensure the background color is explicitly set to white (or a configurable color).
Practice Questions
1. Why process transformations sequentially in a specific order? The order matters because each operation changes the pixel data for the next one. Crop first (removes unwanted areas), then resize (work on the relevant area), then rotate (uses the cropped/resized dimensions), then apply filters and watermark. Doing filter before resize means processing more pixels unnecessarily (slower).
2. How does the thumbnail caching strategy work? Thumbnails are generated on first request and cached to disk. Subsequent requests return the cached file directly. The background task in the process endpoint pre-generates thumbnails so the first GET request is fast. For production, add a CDN (Cloudflare, Fastly) in front of the thumbnail endpoint.
3. Why use UUIDs for file IDs instead of the original filename?
UUIDs prevent collisions (two users uploading “photo.jpg”) and path traversal attacks ("../../etc/passwd"). The original filename is stored separately and sent as the download filename in the Content-Disposition header for user-friendly downloads.
4. Challenge: Add batch processing
Create a /process-batch endpoint that accepts multiple files and the same transformation parameters. Process them concurrently using asyncio.gather() or a thread pool. Return a ZIP file containing all processed images. Use io.BytesIO for in-memory ZIP creation.
5. Challenge: Add face detection blurring
Integrate OpenCV’s Haar Cascade classifier (cv2.CascadeClassifier) to detect faces in images. Add a blur_faces parameter. Detect face regions and apply Gaussian blur to only those regions. This is useful for privacy protection in photo-sharing apps.
FAQ
Next Steps
- Add Docker containerization for deployment
- Learn Redis for caching processed images
- Explore AWS Lambda for serverless image processing
- Build the File Converter project for more format conversions
Built by the developers of DodaTech
Doda Browser, DodaZIP & Durga Antivirus Pro