Skip to content
Build an Image Processing Service with Python (Step by Step)

Build an Image Processing Service with Python (Step by Step)

DodaTech Updated Jun 20, 2026 10 min read

Build an image processing service with FastAPI and Pillow featuring image upload, resize, crop, rotate, blur/sharpen/grayscale filters, watermark overlay, async processing with background tasks, automatic thumbnail generation, and format conversion between JPEG, PNG, and WebP.

What You’ll Build

You’ll build a RESTful image processing API where clients upload images, apply transformations (resize, crop, rotate, filters, watermark), and receive processed results. The service uses background tasks for async processing, generates thumbnails automatically, and supports format conversion. DodaZIP uses this same image processing pipeline for its batch image converter.

Why Build an Image Processing Service?

Image processing is a core requirement for social media apps, e-commerce platforms, content management systems, and SaaS products. Building one teaches you file upload handling, image manipulation algorithms, async task processing, caching strategies, and REST API design for binary data — all skills you’ll use in photo sharing apps, profile picture uploaders, and document scanners.

Prerequisites

Step 1: Project Setup

mkdir image-processor
cd image-processor
python -m venv venv
source venv/bin/activate
pip install fastapi uvicorn pillow python-multipart aiofiles

Step 2: Image Processing Engine

# processor.py
from PIL import Image, ImageFilter, ImageDraw, ImageFont
import io
import os
from typing import Optional, Tuple

class ImageProcessingError(Exception):
    pass

SUPPORTED_FORMATS = {"JPEG": "image/jpeg", "PNG": "image/png", "WEBP": "image/webp"}

def open_image(data: bytes) -> Image.Image:
    try:
        img = Image.open(io.BytesIO(data))
        img.load()
        return img
    except Exception as e:
        raise ImageProcessingError(f"Cannot open image: {e}")

def resize_image(img: Image.Image, width: int, height: int, maintain_aspect: bool = True) -> Image.Image:
    if maintain_aspect:
        img.thumbnail((width, height), Image.LANCZOS)
    else:
        img = img.resize((width, height), Image.LANCZOS)
    return img

def crop_image(img: Image.Image, x: int, y: int, width: int, height: int) -> Image.Image:
    if x < 0 or y < 0 or width <= 0 or height <= 0:
        raise ImageProcessingError("Crop dimensions must be positive")
    if x + width > img.width or y + height > img.height:
        raise ImageProcessingError("Crop area exceeds image boundaries")
    return img.crop((x, y, x + width, y + height))

def rotate_image(img: Image.Image, degrees: float, expand: bool = True) -> Image.Image:
    return img.rotate(degrees, expand=expand, resample=Image.BICUBIC)

def apply_filter(img: Image.Image, filter_name: str) -> Image.Image:
    filters = {
        "blur": ImageFilter.BLUR,
        "sharpen": ImageFilter.SHARPEN,
        "contour": ImageFilter.CONTOUR,
        "emboss": ImageFilter.EMBOSS,
        "edge_enhance": ImageFilter.EDGE_ENHANCE,
        "smooth": ImageFilter.SMOOTH,
    }
    if filter_name in filters:
        return img.filter(filters[filter_name])
    elif filter_name == "grayscale":
        return img.convert("L").convert("RGB")
    else:
        raise ImageProcessingError(f"Unknown filter: {filter_name}. Supported: {list(filters.keys()) + ['grayscale']}")

def apply_gaussian_blur(img: Image.Image, radius: float = 2.0) -> Image.Image:
    return img.filter(ImageFilter.GaussianBlur(radius=radius))

def add_watermark(
    img: Image.Image,
    text: str = "DodaTech",
    position: str = "bottom-right",
    opacity: int = 128,
) -> Image.Image:
    overlay = img.copy()
    draw = ImageDraw.Draw(overlay, "RGBA")

    try:
        font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 36)
    except (IOError, OSError):
        font = ImageFont.load_default()

    bbox = draw.textbbox((0, 0), text, font=font)
    text_w = bbox[2] - bbox[0]
    text_h = bbox[3] - bbox[1]

    padding = 20
    positions = {
        "top-left": (padding, padding),
        "top-right": (img.width - text_w - padding, padding),
        "bottom-left": (padding, img.height - text_h - padding),
        "bottom-right": (img.width - text_w - padding, img.height - text_h - padding),
        "center": ((img.width - text_w) // 2, (img.height - text_h) // 2),
    }

    pos = positions.get(position, positions["bottom-right"])
    draw.text(pos, text, font=font, fill=(255, 255, 255, opacity))

    return Image.alpha_composite(img.convert("RGBA"), overlay)

def convert_format(img: Image.Image, output_format: str) -> Image.Image:
    output_format = output_format.upper()
    if output_format not in SUPPORTED_FORMATS:
        raise ImageProcessingError(f"Unsupported format: {output_format}. Supported: {list(SUPPORTED_FORMATS.keys())}")

    if output_format == "JPEG" and img.mode in ("RGBA", "P", "LA"):
        background = Image.new("RGB", img.size, (255, 255, 255))
        if img.mode == "RGBA":
            background.paste(img, mask=img.split()[3])
        else:
            background.paste(img)
        img = background
    elif output_format == "WEBP" and img.mode == "P":
        img = img.convert("RGBA")

    return img

def save_image(img: Image.Image, output_format: str, quality: int = 85) -> bytes:
    output = io.BytesIO()
    img.save(output, format=output_format.upper(), quality=quality)
    return output.getvalue()

def generate_thumbnail(data: bytes, size: Tuple[int, int] = (150, 150)) -> bytes:
    img = open_image(data)
    img.thumbnail(size, Image.LANCZOS)
    fmt = img.format or "JPEG"
    return save_image(img, fmt)

Step 3: FastAPI Application

# main.py
from fastapi import FastAPI, UploadFile, File, Form, BackgroundTasks, HTTPException
from fastapi.responses import Response, JSONResponse
from fastapi.staticfiles import StaticFiles
import os
import uuid
import aiofiles
from pathlib import Path
from typing import Optional

from processor import (
    open_image, resize_image, crop_image, rotate_image,
    apply_filter, apply_gaussian_blur, add_watermark,
    convert_format, save_image, generate_thumbnail,
    ImageProcessingError, SUPPORTED_FORMATS,
)

app = FastAPI(title="Image Processing Service", version="1.0.0")

UPLOAD_DIR = Path("uploads")
THUMBNAIL_DIR = Path("thumbnails")
UPLOAD_DIR.mkdir(exist_ok=True)
THUMBNAIL_DIR.mkdir(exist_ok=True)

ALLOWED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".webp", ".gif", ".bmp", ".tiff"}
MAX_FILE_SIZE = 20 * 1024 * 1024  # 20MB

async def save_upload(file: UploadFile) -> tuple[str, bytes]:
    content = await file.read()
    if len(content) > MAX_FILE_SIZE:
        raise HTTPException(status_code=413, detail="File too large (max 20MB)")

    ext = Path(file.filename).suffix.lower()
    if ext not in ALLOWED_EXTENSIONS:
        raise HTTPException(status_code=400, detail=f"Unsupported file type: {ext}")

    file_id = str(uuid.uuid4())
    filepath = UPLOAD_DIR / f"{file_id}{ext}"
    async with aiofiles.open(filepath, "wb") as f:
        await f.write(content)

    return file_id, content

@app.post("/upload")
async def upload_image(file: UploadFile = File(...)):
    file_id, content = await save_upload(file)
    return {"file_id": file_id, "filename": file.filename, "size": len(content)}

@app.post("/process/{file_id}")
async def process_image(
    file_id: str,
    background_tasks: BackgroundTasks,
    resize_width: Optional[int] = Form(None),
    resize_height: Optional[int] = Form(None),
    crop_x: Optional[int] = Form(None),
    crop_y: Optional[int] = Form(None),
    crop_width: Optional[int] = Form(None),
    crop_height: Optional[int] = Form(None),
    rotate_degrees: Optional[float] = Form(None),
    filter_name: Optional[str] = Form(None),
    blur_radius: Optional[float] = Form(None),
    watermark_text: Optional[str] = Form(None),
    watermark_position: Optional[str] = Form("bottom-right"),
    watermark_opacity: Optional[int] = Form(128),
    output_format: Optional[str] = Form(None),
    quality: Optional[int] = Form(85),
):
    # Find the uploaded file
    upload_files = list(UPLOAD_DIR.glob(f"{file_id}.*"))
    if not upload_files:
        raise HTTPException(status_code=404, detail="File not found")

    filepath = upload_files[0]
    data = filepath.read_bytes()
    img = open_image(data)
    original_format = img.format or "JPEG"

    try:
        # Apply transformations in order
        if crop_x is not None and crop_y is not None and crop_width is not None and crop_height is not None:
            img = crop_image(img, crop_x, crop_y, crop_width, crop_height)

        if resize_width is not None and resize_height is not None:
            img = resize_image(img, resize_width, resize_height)

        if rotate_degrees is not None:
            img = rotate_image(img, rotate_degrees)

        if filter_name:
            img = apply_filter(img, filter_name)

        if blur_radius is not None:
            img = apply_gaussian_blur(img, blur_radius)

        if watermark_text:
            img = add_watermark(img, watermark_text, watermark_position, watermark_opacity)

        fmt = (output_format or original_format).upper()
        img = convert_format(img, fmt)
        result_data = save_image(img, fmt, quality=quality)

        # Generate thumbnail in background
        background_tasks.add_task(
            generate_and_save_thumbnail, data, file_id
        )

        mime = SUPPORTED_FORMATS.get(fmt, "application/octet-stream")
        filename = f"processed_{file_id}.{fmt.lower()}"

        return Response(content=result_data, media_type=mime, headers={
            "Content-Disposition": f'attachment; filename="{filename}"',
        })

    except ImageProcessingError as e:
        raise HTTPException(status_code=400, detail=str(e))

async def generate_and_save_thumbnail(data: bytes, file_id: str):
    try:
        thumb_data = generate_thumbnail(data)
        thumb_path = THUMBNAIL_DIR / f"{file_id}_thumb.jpg"
        async with aiofiles.open(thumb_path, "wb") as f:
            await f.write(thumb_data)
    except Exception as e:
        print(f"Thumbnail generation failed: {e}")

@app.get("/thumbnail/{file_id}")
async def get_thumbnail(file_id: str):
    thumb_path = THUMBNAIL_DIR / f"{file_id}_thumb.jpg"
    if not thumb_path.exists():
        upload_files = list(UPLOAD_DIR.glob(f"{file_id}.*"))
        if not upload_files:
            raise HTTPException(status_code=404, detail="File not found")
        data = upload_files[0].read_bytes()
        thumb_data = generate_thumbnail(data)
        async with aiofiles.open(thumb_path, "wb") as f:
            await f.write(thumb_data)
        return Response(content=thumb_data, media_type="image/jpeg")

    thumb_data = thumb_path.read_bytes()
    return Response(content=thumb_data, media_type="image/jpeg")

@app.get("/info/{file_id}")
async def get_image_info(file_id: str):
    upload_files = list(UPLOAD_DIR.glob(f"{file_id}.*"))
    if not upload_files:
        raise HTTPException(status_code=404, detail="File not found")

    img = open_image(upload_files[0].read_bytes())
    return {
        "file_id": file_id,
        "filename": upload_files[0].name,
        "format": img.format,
        "mode": img.mode,
        "width": img.width,
        "height": img.height,
        "size_bytes": upload_files[0].stat().st_size,
    }

@app.delete("/upload/{file_id}")
async def delete_upload(file_id: str):
    upload_files = list(UPLOAD_DIR.glob(f"{file_id}.*"))
    thumbnail_files = list(THUMBNAIL_DIR.glob(f"{file_id}_thumb.*"))

    deleted = 0
    for f in upload_files + thumbnail_files:
        f.unlink()
        deleted += 1

    if deleted == 0:
        raise HTTPException(status_code=404, detail="File not found")

    return {"deleted": deleted}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Step 4: Run and Test

python main.py

Expected output:

INFO:     Started server process [12345]
INFO:     Uvicorn running on http://0.0.0.0:8000

Test with curl:

# Upload
curl -X POST http://localhost:8000/upload \
  -F "file=@test_image.jpg"

# Expected:
# {"file_id":"abc123...","filename":"test_image.jpg","size":..."

# Process: resize to 300x300, grayscale, output PNG
curl -X POST http://localhost:8000/process/abc123... \
  -F "resize_width=300" \
  -F "resize_height=300" \
  -F "filter_name=grayscale" \
  -F "output_format=PNG" \
  --output processed.png

# Process: add watermark
curl -X POST http://localhost:8000/process/abc123... \
  -F "watermark_text=DodaTech" \
  -F "watermark_position=bottom-right" \
  --output watermarked.jpg

# Get image info
curl http://localhost:8000/info/abc123...

# Expected:
# {"file_id":"abc123...","format":"JPEG","width":1920,"height":1080,...}

# Get thumbnail
curl http://localhost:8000/thumbnail/abc123... --output thumb.jpg

Architecture


sequenceDiagram
    participant Client as Client App
    participant API as FastAPI Service
    participant Proc as Image Processor
    participant Storage as File System

    Client->>API: POST /upload (multipart file)
    API->>Storage: Save file with UUID
    API-->>Client: {file_id, filename, size}

    Client->>API: POST /process/{file_id} (transform params)
    API->>Storage: Read original file
    API->>Proc: Apply transformations sequentially
    Proc->>Proc: crop → resize → rotate → filter → watermark
    Proc->>Proc: Convert format, save to bytes
    API->>API: Queue thumbnail generation (background)
    API-->>Client: Response with processed image bytes

    Client->>API: GET /thumbnail/{file_id}
    API->>Storage: Check for cached thumbnail
    alt Cached thumbnail exists
        API-->>Client: Return thumbnail bytes
    else No cached thumbnail
        API->>Storage: Read original
        API->>Proc: Generate thumbnail
        API->>Storage: Cache thumbnail
        API-->>Client: Return thumbnail bytes
    end

Common Errors

1. “Cannot open image” for seemingly valid files Pillow opens files based on content (magic bytes), not extension. A .jpg file saved with wrong encoding (e.g., CMYK JPEG, progressive JPEG with incompatible libjpeg) can fail. Convert the file with a known-good tool first. Check Pillow’s supported format list — some formats require optional system libraries.

2. Watermark text appears cut off at edges The watermark position calculation uses text bounding box but doesn’t account for the text exceeding image boundaries after rotation. If the image is small (<100px), the watermark text will overflow. Add a minimum image size check for watermarking, or scale the font size relative to image dimensions.

3. Background task fails silently FastAPI background tasks don’t return errors to the client. If thumbnail generation fails, you’ll see nothing. Always wrap background task logic in try/except with logging. Add a health check endpoint that verifies thumbnail storage is writable.

4. JPEG output has pink/green tint This happens when converting RGBA (with transparency) to JPEG. Our convert_format function handles this by compositing onto a white background. If the image has transparency and the target format is JPEG, ensure the background color is explicitly set to white (or a configurable color).

Practice Questions

1. Why process transformations sequentially in a specific order? The order matters because each operation changes the pixel data for the next one. Crop first (removes unwanted areas), then resize (work on the relevant area), then rotate (uses the cropped/resized dimensions), then apply filters and watermark. Doing filter before resize means processing more pixels unnecessarily (slower).

2. How does the thumbnail caching strategy work? Thumbnails are generated on first request and cached to disk. Subsequent requests return the cached file directly. The background task in the process endpoint pre-generates thumbnails so the first GET request is fast. For production, add a CDN (Cloudflare, Fastly) in front of the thumbnail endpoint.

3. Why use UUIDs for file IDs instead of the original filename? UUIDs prevent collisions (two users uploading “photo.jpg”) and path traversal attacks ("../../etc/passwd"). The original filename is stored separately and sent as the download filename in the Content-Disposition header for user-friendly downloads.

4. Challenge: Add batch processing Create a /process-batch endpoint that accepts multiple files and the same transformation parameters. Process them concurrently using asyncio.gather() or a thread pool. Return a ZIP file containing all processed images. Use io.BytesIO for in-memory ZIP creation.

5. Challenge: Add face detection blurring Integrate OpenCV’s Haar Cascade classifier (cv2.CascadeClassifier) to detect faces in images. Add a blur_faces parameter. Detect face regions and apply Gaussian blur to only those regions. This is useful for privacy protection in photo-sharing apps.

FAQ

How do I add more filters?
Pillow’s ImageFilter module has built-in filters: BLUR, CONTOUR, DETAIL, EDGE_ENHANCE, EMBOSS, SHARPEN, SMOOTH. For custom filters, use ImageFilter.Kernel() with a custom 3x3 or 5x5 convolution matrix. For artistic filters, explore Pillow’s ImageOps module for posterize, solarize, and equalize.
How do I handle very large images (>10000px)?
Pillow loads the entire image into memory. For large images, use Image.open() with Image.LANCZOS for downsampling during read. Set Image.MAX_IMAGE_PIXELS to avoid decompression bombs. For massive images, consider a dedicated image server like ImageMagick’s convert command or cloud services like Cloudinary.
How do I deploy this?
Package with Docker: use a slim Python image, install system libs for Pillow (libjpeg, zlib, libwebp). Run with Gunicorn + Uvicorn workers: gunicorn -k uvicorn.workers.UvicornWorker main:app. Add NGINX as a reverse proxy. Store uploads on S3-compatible storage for scalability.

Next Steps

  • Add Docker containerization for deployment
  • Learn Redis for caching processed images
  • Explore AWS Lambda for serverless image processing
  • Build the File Converter project for more format conversions

Built by the developers of DodaTech

Doda Browser, DodaZIP & Durga Antivirus Pro