Skip to content
Data Quality & Testing — Great Expectations, dbt Tests & Automated Validation

Data Quality & Testing — Great Expectations, dbt Tests & Automated Validation

DodaTech Updated Jun 20, 2026 13 min read

Data quality testing is the practice of automatically validating data against defined expectations — schema, completeness, uniqueness, freshness, and business rules — at every stage of the pipeline, from ingestion to consumption.

What You’ll Learn

This tutorial covers the complete data quality landscape: Great Expectations for declarative data validation, dbt tests for SQL-first quality checks, data profiling for discovering distributions, the six DQ dimensions, building automated validation pipelines, and monitoring data quality in production.

Why It Matters

Bad data costs businesses 15-20% of revenue and erodes trust in analytics. Testing data is harder than testing code because data changes over time — what’s valid today may fail tomorrow. DodaTech runs automated quality checks on all Doda Browser telemetry pipelines to ensure dashboard accuracy.

Real-World Use

Airbnb uses Great Expectations across 10,000+ datasets to detect data drift. dbt Labs tests every model transformation against source uniqueness and referential integrity. Financial institutions run column-level profiling to detect regulatory violations before reports are generated.

    flowchart LR
    subgraph Sources
        A[Raw Data] --> B[Bronze Layer]
    end
    subgraph Quality[Quality Checks]
        B --> C[Expectations<br/>GE]
        C --> D{Tests Pass?}
        D -->|Yes| E[Silver Layer]
        D -->|No| F[Quarantine]
        B --> G[Profiling]
        G --> H[Anomaly Alert]
    end
    subgraph Monitoring
        E --> I[dbt Tests]
        I --> J{DQ Score}
        J --> K[Dashboard]
        J --> L[PagerDuty]
    end
  

The Six Data Quality Dimensions

Every data quality test maps to one of these dimensions:

DimensionWhat It MeasuresExample Test
CompletenessAre values missing?NOT NULL, column != ''
UniquenessAre there duplicates?COUNT(DISTINCT col) = COUNT(col)
TimelinessIs data fresh?MAX(updated_at) < 1 hour ago
ValidityDoes data match format?Regex pattern, type check
AccuracyIs data correct?Cross-reference with source
ConsistencyDoes data agree across systems?Row count match between tables
class DataQualityDimension:
    """Evaluate a dataset across all DQ dimensions."""

    @staticmethod
    def completeness(data, column):
        """Completeness: % of non-null values."""
        total = len(data)
        non_null = sum(1 for row in data if row.get(column) is not None)
        return {"dimension": "completeness", "column": column,
                "score": round(non_null / total * 100, 2) if total > 0 else 0}

    @staticmethod
    def uniqueness(data, column):
        """Uniqueness: % of values that are unique."""
        total = len(data)
        values = [row.get(column) for row in data if row.get(column) is not None]
        unique_count = len(set(values))
        return {"dimension": "uniqueness", "column": column,
                "score": round(unique_count / total * 100, 2) if total > 0 else 0}

    @staticmethod
    def validity(data, column, pattern):
        """Validity: % of values matching a pattern."""
        import re
        total = len(data)
        valid = sum(1 for row in data 
                    if row.get(column) and re.match(pattern, str(row[column])))
        return {"dimension": "validity", "column": column,
                "score": round(valid / total * 100, 2) if total > 0 else 0}

    @staticmethod
    def freshness(last_updated, max_age_hours=24):
        """Timeliness: is data recent enough?"""
        from datetime import datetime, timedelta
        age_hours = (datetime.now() - last_updated).total_seconds() / 3600
        passed = age_hours <= max_age_hours
        return {"dimension": "timeliness",
                "age_hours": round(age_hours, 1),
                "max_age_hours": max_age_hours,
                "passed": passed,
                "score": 100 if passed else 0}

# Evaluate sample data
sample_data = [
    {"email": "alice@example.com", "order_id": "ORD-001"},
    {"email": "bob@example.com", "order_id": "ORD-002"},
    {"email": None, "order_id": "ORD-003"},
    {"email": "charlie@example.com", "order_id": "ORD-001"},  # Duplicate order_id
    {"email": "invalid-email", "order_id": "ORD-005"},
]

results = [
    DataQualityDimension.completeness(sample_data, "email"),
    DataQualityDimension.uniqueness(sample_data, "order_id"),
    DataQualityDimension.validity(sample_data, "email", r"^[\w.+-]+@[\w-]+\.[\w.]+$"),
]
for r in results:
    print(f"[{r['dimension']}] {r['column']}: {r['score']}%")

Expected output:

[completeness] email: 80.0%
[uniqueness] order_id: 80.0%
[validity] email: 60.0%

Great Expectations — Declarative Data Validation

Great Expectations (GE) is an open-source library that lets you define “expectations” about your data — like assertions for data — and run them against any data source.

Core Concepts

  • Expectation: A declarative statement about data (e.g., “column x must be unique”)
  • Expectation Suite: A collection of expectations for a dataset
  • Data Context: The project configuration and store
  • Checkpoint: A run configuration that connects suites to data
# Simulating Great Expectations validation
class GreatExpectationSuite:
    """Simulate a GE expectation suite."""

    def __init__(self, suite_name):
        self.suite_name = suite_name
        self.expectations = []
        self.results = []

    def expect_column_to_exist(self, column):
        self.expectations.append(("expect_column_to_exist", {"column": column}))

    def expect_column_values_to_not_be_null(self, column):
        self.expectations.append(("expect_column_values_to_not_be_null", {"column": column}))

    def expect_column_values_to_be_unique(self, column):
        self.expectations.append(("expect_column_values_to_be_unique", {"column": column}))

    def expect_column_values_to_match_regex(self, column, regex):
        self.expectations.append(
            ("expect_column_values_to_match_regex", {"column": column, "regex": regex}))

    def expect_column_values_to_be_between(self, column, min_val, max_val):
        self.expectations.append(
            ("expect_column_values_to_be_between", 
             {"column": column, "min": min_val, "max": max_val}))

    def run(self, data):
        """Run all expectations against data."""
        import re
        self.results = []
        for exp_name, kwargs in self.expectations:
            column = kwargs.get("column")
            values = [row.get(column) for row in data]
            non_null = [v for v in values if v is not None]

            if exp_name == "expect_column_to_exist":
                passed = column in (data[0] if data else {})
            elif exp_name == "expect_column_values_to_not_be_null":
                passed = all(v is not None for v in values)
            elif exp_name == "expect_column_values_to_be_unique":
                passed = len(non_null) == len(set(non_null))
            elif exp_name == "expect_column_values_to_match_regex":
                passed = all(re.match(kwargs["regex"], str(v)) 
                           for v in non_null)
            elif exp_name == "expect_column_values_to_be_between":
                passed = all(kwargs["min"] <= v <= kwargs["max"] 
                           for v in non_null)
            else:
                passed = True

            self.results.append({
                "expectation": exp_name,
                "kwargs": kwargs,
                "passed": passed,
                "observed": f"{sum(1 for v in values if v is not None)} values",
            })

        return self.results

    def summary(self):
        total = len(self.results)
        passed = sum(1 for r in self.results if r["passed"])
        print(f"\n=== GE Suite: {self.suite_name} ===")
        print(f"Passed: {passed}/{total}")
        for r in self.results:
            status = "✅" if r["passed"] else "❌"
            print(f"  {status} {r['expectation']}({r['kwargs']}) -> {r['observed']}")

# Create and run a suite
suite = GreatExpectationSuite("orders_suite")
suite.expect_column_to_exist("order_id")
suite.expect_column_values_to_not_be_null("order_id")
suite.expect_column_values_to_be_unique("order_id")
suite.expect_column_values_to_match_regex("order_id", r"^ORD-\d{3}$")
suite.expect_column_values_to_be_between("amount", 0.01, 10000)

order_data = [
    {"order_id": "ORD-001", "amount": 250.0},
    {"order_id": "ORD-002", "amount": 50.0},
    {"order_id": None, "amount": -5.0},  # Null order_id, negative amount
    {"order_id": "ORD-004", "amount": 15000.0},  # Over max
]
suite.run(order_data)
suite.summary()

Expected output:

=== GE Suite: orders_suite ===
Passed: 2/5
  ✅ expect_column_to_exist({'column': 'order_id'}) -> 4 values
  ❌ expect_column_values_to_not_be_null({'column': 'order_id'}) -> 4 values
  ❌ expect_column_values_to_be_unique({'column': 'order_id'}) -> 4 values
  ✅ expect_column_values_to_match_regex({'column': 'order_id'}) -> 4 values
  ❌ expect_column_values_to_be_between({'column': 'amount'}) -> 4 values

Data Profiling with Great Expectations

GE can also generate an expectation suite automatically by profiling data.

class DataProfiler:
    """Auto-generate expectations by profiling data."""

    def __init__(self, data):
        self.data = data
        self.profile = {}

    def profile_column(self, column):
        values = [row.get(column) for row in self.data if row.get(column) is not None]
        non_null = sum(1 for row in self.data if row.get(column) is not None)
        total = len(self.data)

        profile = {
            "column": column,
            "type": type(values[0]).__name__ if values else "unknown",
            "non_null_count": non_null,
            "null_count": total - non_null,
            "null_pct": round((total - non_null) / total * 100, 2) if total > 0 else 0,
            "unique_count": len(set(values)),
            "min": min(values) if values else None,
            "max": max(values) if values else None,
        }

        if profile["type"] in ("int", "float"):
            numeric = [v for v in values if isinstance(v, (int, float))]
            profile["mean"] = round(sum(numeric) / len(numeric), 2) if numeric else 0
            profile["std"] = round(
                (sum((x - profile["mean"])**2 for x in numeric) / len(numeric))**0.5, 2
            ) if numeric else 0

        self.profile[column] = profile
        return profile

    def suggest_expectations(self):
        """Suggest GE expectations based on profile."""
        suggestions = []
        for col, p in self.profile.items():
            suggestions.append(f"expect_column_to_exist('{col}')")
            if p["null_pct"] > 0:
                suggestions.append(f"expect_column_values_to_not_be_null('{col}') "
                                   f"[FAIL: {p['null_pct']}% nulls]")
            if p["null_pct"] == 0:
                suggestions.append(f"expect_column_values_to_not_be_null('{col}')")
            if p["unique_count"] == p["non_null_count"] and p["non_null_count"] > 0:
                suggestions.append(f"expect_column_values_to_be_unique('{col}')")
            if p["min"] is not None and p["max"] is not None:
                suggestions.append(f"expect_column_values_to_be_between('{col}', "
                                   f"{p['min']}, {p['max']})")
        return suggestions

profiler = DataProfiler(order_data)
for col in ["order_id", "amount"]:
    p = profiler.profile_column(col)
    print(f"Profile {col}: null={p['null_pct']}%, unique={p['unique_count']}, "
          f"range=[{p['min']}, {p['max']}]")

print("\nSuggested expectations:")
for s in profiler.suggest_expectations():
    print(f"  {s}")

Expected output:

Profile order_id: null=25.0%, unique=3, range=[None, None]
Profile amount: null=0.0%, unique=4, range=[-5.0, 15000.0]

Suggested expectations:
  expect_column_to_exist('order_id')
  expect_column_values_to_not_be_null('order_id') [FAIL: 25.0% nulls]
  expect_column_values_to_not_be_null('amount')
  expect_column_values_to_be_between('amount', -5.0, 15000.0)

dbt Tests — SQL-Based Quality Checks

dbt provides two types of tests: generic (schema-based) and singular (SQL queries).

Generic Tests

Generic tests are defined in YAML and validate one thing: uniqueness, non-null, accepted values, or referential integrity.

# models/schema.yml — Generic dbt tests
version: 2

models:
  - name: stg_orders
    columns:
      - name: order_id
        tests:
          - unique
          - not_null
      - name: status
        tests:
          - accepted_values:
              values: ['pending', 'shipped', 'completed', 'cancelled']
      - name: customer_id
        tests:
          - relationships:
              to: ref('stg_customers')
              field: customer_id

Singular Tests

Singular tests are SQL queries that return failing rows:

-- tests/assert_positive_order_amount.sql
-- Fail if any order amount is negative or zero
SELECT *
FROM {{ ref('stg_orders') }}
WHERE amount <= 0
-- tests/assert_recent_data_fresh.sql
-- Fail if no orders in the last hour
SELECT *
FROM {{ ref('stg_orders') }}
WHERE order_date < DATEADD('hour', -1, CURRENT_DATE)
LIMIT 1
# Simulating dbt test execution
class DbtTestRunner:
    """Run dbt-style tests against data."""

    def __init__(self):
        self.tests = []
        self.results = []

    def add_generic_test(self, model, column, test_type, params=None):
        self.tests.append({
            "model": model,
            "column": column,
            "type": test_type,
            "params": params or {},
        })

    def add_singular_test(self, name, sql, models_map):
        self.tests.append({
            "type": "singular",
            "name": name,
            "sql": sql,
            "models": models_map,
        })

    def run(self, data_map):
        """Run all tests against provided data."""
        self.results = []
        for test in self.tests:
            if test["type"] == "singular":
                self._run_singular(test, data_map)
            else:
                self._run_generic(test, data_map)
        return self.results

    def _run_generic(self, test, data_map):
        col = test["column"]
        model = test["model"]
        data = data_map.get(model, [])
        values = [row.get(col) for row in data if row.get(col) is not None]
        total = len(data)

        if test["type"] == "not_null":
            passed = all(row.get(col) is not None for row in data)
            result = (f"not_null({model}.{col})", passed,
                      f"{sum(1 for v in values if v is not None)}/{total} non-null")
        elif test["type"] == "unique":
            passed = len(values) == len(set(values))
            result = (f"unique({model}.{col})", passed,
                      f"{len(values)} values, {len(set(values))} unique")
        elif test["type"] == "accepted_values":
            accepted = set(test["params"].get("values", []))
            bad = [v for v in values if v not in accepted]
            passed = len(bad) == 0
            result = (f"accepted_values({model}.{col})", passed,
                      f"{len(bad)} invalid: {bad[:3]}")
        elif test["type"] == "relationships":
            to_model = test["params"].get("to")
            to_field = test["params"].get("field")
            ref_data = data_map.get(to_model, [])
            ref_values = {row.get(to_field) for row in ref_data if row.get(to_field)}
            orphaned = [v for v in values if v not in ref_values]
            passed = len(orphaned) == 0
            result = (f"relationships({model}.{col} -> {to_model}.{to_field})",
                      passed, f"{len(orphaned)} orphaned: {orphaned[:3]}")
        else:
            result = (test["type"], True, "Unknown test")

        self.results.append(result)

    def _run_singular(self, test, data_map):
        """Simulate running a SQL singular test."""
        sql = test["sql"]
        models = test["models"]
        ref_name = test.get("name", "singular_test")

        # Simple simulation: return rows where condition fails
        model_name = list(models.keys())[0] if models else None
        data = data_map.get(model_name, []) if model_name else []

        # Naive parsing: simulate WHERE clause
        failing = [row for row in data if row.get("amount", 1) <= 0]
        passed = len(failing) == 0
        self.results.append(
            (f"singular.{ref_name}", passed, f"{len(failing)} failing rows"))

    def summary(self):
        total = len(self.results)
        passed_count = sum(1 for _, p, _ in self.results if p)
        print(f"\n=== dbt Test Results ===")
        print(f"Passed: {passed_count}/{total}")
        for name, passed, detail in self.results:
            status = "✅" if passed else "❌"
            print(f"  {status} {name} -> {detail}")

# Run simulated dbt tests
runner = DbtTestRunner()
runner.add_generic_test("stg_orders", "order_id", "not_null")
runner.add_generic_test("stg_orders", "order_id", "unique")
runner.add_generic_test("stg_orders", "status", "accepted_values",
                        {"values": ["pending", "shipped", "completed", "cancelled"]})
runner.add_generic_test("stg_orders", "customer_id", "relationships",
                        {"to": "stg_customers", "field": "customer_id"})
runner.add_singular_test("assert_positive_order_amount",
                         "SELECT * FROM stg_orders WHERE amount <= 0",
                         {"stg_orders": "orders"})

test_data = {
    "stg_orders": [
        {"order_id": "ORD-001", "status": "completed", "customer_id": "C001", "amount": 250.0},
        {"order_id": "ORD-002", "status": "pending", "customer_id": "C002", "amount": 50.0},
        {"order_id": "ORD-003", "status": "unknown", "customer_id": "C003", "amount": -5.0},
        {"order_id": None, "status": "completed", "customer_id": "C001", "amount": 10.0},
        {"order_id": "ORD-001", "status": "completed", "customer_id": "C099", "amount": 100.0},
    ],
    "stg_customers": [
        {"customer_id": "C001", "name": "Alice"},
        {"customer_id": "C002", "name": "Bob"},
    ],
}

runner.run(test_data)
runner.summary()

Expected output:

=== dbt Test Results ===
Passed: 1/5
  ❌ not_null(stg_orders.order_id) -> 4/5 non-null
  ❌ unique(stg_orders.order_id) -> 4 values, 3 unique
  ❌ accepted_values(stg_orders.status) -> 1 invalid: ['unknown']
  ❌ relationships(stg_orders.customer_id -> stg_customers.customer_id) -> 1 orphaned: ['C099']
  ❌ singular.assert_positive_order_amount -> 1 failing rows

Automated Validation Pipeline

Combine profiling, expectations, and dbt tests into an automated pipeline triggered on every data load.

class AutomatedValidationPipeline:
    """End-to-end automated data quality pipeline."""

    def __init__(self, pipeline_name):
        self.name = pipeline_name
        self.steps = []
        self.scores = {}

    def add_step(self, step_name, check_fn):
        self.steps.append({"name": step_name, "fn": check_fn})

    def run(self, data):
        print(f"\n{'='*50}")
        print(f"Pipeline: {self.name}")
        print(f"{'='*50}")
        for step in self.steps:
            print(f"\n{step['name']}")
            result = step["fn"](data)
            self.scores[step["name"]] = result
            if isinstance(result, dict):
                for k, v in result.items():
                    print(f"  {k}: {v}")

        overall = self.calculate_overall()
        print(f"\n{'='*50}")
        print(f"Overall DQ Score: {overall}%")
        if overall < 80:
            print("STATUS: ❌ FAIL - Blocking pipeline")
        elif overall < 95:
            print("STATUS: ⚠ WARN - Non-blocking alert")
        else:
            print("STATUS: ✅ PASS")
        print(f"{'='*50}")
        return overall

    def calculate_overall(self):
        scores = []
        for name, result in self.scores.items():
            if isinstance(result, dict) and "score" in result:
                scores.append(result["score"])
        return round(sum(scores) / len(scores), 1) if scores else 0

# Build pipeline
dq = AutomatedValidationPipeline("clickstream_quality")

# Step 1: Schema validation
def check_schema(data):
    expected = {"user_id", "page", "event_time", "duration_s"}
    actual = set(data[0].keys()) if data else set()
    missing = expected - actual
    return {"expected_columns": list(expected), "missing": list(missing),
            "score": 100 if not missing else 50}

# Step 2: Completeness
def check_completeness(data):
    total = len(data)
    nulls = sum(1 for row in data if not row.get("user_id"))
    return {"total_rows": total, "null_user_ids": nulls,
            "score": round((total - nulls) / total * 100, 1)}

# Step 3: Freshness
def check_freshness(data):
    from datetime import datetime
    # Assume latest event_time
    times = [row.get("event_time") for row in data if row.get("event_time")]
    if times:
        latest = max(times)
        age_mins = (datetime.now() - latest).total_seconds() / 60
        return {"latest_event": latest.isoformat(), "age_minutes": round(age_mins, 1),
                "score": 100 if age_mins < 60 else 0}
    return {"latest_event": None, "age_minutes": None, "score": 0}

dq.add_step("Schema Validation", check_schema)
dq.add_step("Completeness Check", check_completeness)
dq.add_step("Freshness Check", check_freshness)

from datetime import datetime, timedelta
sample = [
    {"user_id": "u1", "page": "/home", "event_time": datetime.now() - timedelta(minutes=5),
     "duration_s": 30},
    {"user_id": "u2", "page": "/about", "event_time": datetime.now() - timedelta(minutes=10),
     "duration_s": 45},
    {"user_id": None, "page": "/contact", "event_time": datetime.now() - timedelta(hours=2),
     "duration_s": 15},
]
dq.run(sample)

Expected output:

==================================================
Pipeline: clickstream_quality
==================================================

▶ Schema Validation
  expected_columns: ['page', 'user_id', 'duration_s', 'event_time']
  missing: []
  score: 100

▶ Completeness Check
  total_rows: 3
  null_user_ids: 1
  score: 66.7

▶ Freshness Check
  latest_event: 2026-06-20T...
  age_minutes: 5.0
  score: 100

==================================================
Overall DQ Score: 88.9%
STATUS: ⚠ WARN - Non-blocking alert
==================================================

Common Mistakes

1. Testing Only at the End

Quality checks must run at every layer (bronze → silver → gold). A single bad record corrupts all downstream aggregations.

2. Not Testing Source Data

Assume third-party data is always unclean. Run not_null and accepted_values tests on every source immediately after ingestion.

3. No Freshness Alerts

A pipeline that loads stale data silently is worse than a broken pipeline. Always set freshness SLAs with alerts.

4. Tests That Never Fail

If all tests always pass, they’re not testing enough. Review test coverage quarterly and add edge case expectations.

5. Ignoring Data Distribution Drift

Schema tests catch structural changes, but distribution drift (e.g., “status” suddenly has 50% nulls instead of 0%) often goes undetected.

Practice Questions

  1. What are the six data quality dimensions? Completeness, uniqueness, timeliness, validity, accuracy, consistency.

  2. How does Great Expectations differ from dbt tests? GE is declarative (assertions about data state), engine-agnostic, and runs outside the warehouse. dbt tests are SQL-based, run inside the warehouse, and integrate with dbt models.

  3. What is the difference between a generic and a singular dbt test? Generic tests are defined in YAML schema files (not_null, unique, relationships). Singular tests are custom SQL queries that return failing rows.

  4. What is data profiling and why is it useful? Profiling analyzes column statistics (null %, unique count, min/max, distribution) to discover data characteristics and suggest quality expectations.

  5. Challenge: Design a data quality framework for a real-time pipeline processing 10M events/hour. How would sampling work? What would you alert on?

Mini Project: Data Quality Dashboard

# dq_dashboard.py
# Simulate a data quality monitoring dashboard

class DQDashboard:
    def __init__(self):
        self.scores = {}
        self.alerts = []
        self.trends = []

    def record_run(self, pipeline_name, score, timestamp=None):
        from datetime import datetime
        ts = timestamp or datetime.now()
        if pipeline_name not in self.scores:
            self.scores[pipeline_name] = []
        self.scores[pipeline_name].append((ts, score))
        self.trends.append({"pipeline": pipeline_name, "timestamp": ts, "score": score})

        if score < 80:
            self.alerts.append({
                "pipeline": pipeline_name,
                "timestamp": ts,
                "score": score,
                "severity": "CRITICAL" if score < 60 else "WARNING",
            })

    def dashboard(self):
        from datetime import datetime, timedelta
        now = datetime.now()
        print(f"\n{'='*55}")
        print(f"  Data Quality Dashboard — {now.date()}")
        print(f"{'='*55}")

        for pipeline, entries in self.scores.items():
            latest = entries[-1][1] if entries else 0
            trend = "↑" if len(entries) > 1 and entries[-1][1] > entries[-2][1] else "↓"
            bar = "█" * (latest // 10)
            print(f"\n{pipeline}")
            print(f"  Latest: {latest}% {trend}  {bar}")

        print(f"\n  Active Alerts: {len(self.alerts)}")
        for alert in self.alerts[:3]:
            print(f"    {alert['severity']}: {alert['pipeline']} "
                  f"scored {alert['score']}%")

# Demo
dash = DQDashboard()
dash.record_run("clickstream", 92.1)
dash.record_run("clickstream", 88.5)
dash.record_run("clickstream", 73.2)  # Warning
dash.record_run("orders", 95.0)
dash.record_run("orders", 100.0)
dash.dashboard()

What’s Next

Built by the developers of Doda Browser, DodaZIP, and Durga Antivirus Pro.

Built by the developers of DodaTech

Doda Browser, DodaZIP & Durga Antivirus Pro