Data Quality & Testing — Great Expectations, dbt Tests & Automated Validation
Data quality testing is the practice of automatically validating data against defined expectations — schema, completeness, uniqueness, freshness, and business rules — at every stage of the pipeline, from ingestion to consumption.
What You’ll Learn
This tutorial covers the complete data quality landscape: Great Expectations for declarative data validation, dbt tests for SQL-first quality checks, data profiling for discovering distributions, the six DQ dimensions, building automated validation pipelines, and monitoring data quality in production.
Why It Matters
Bad data costs businesses 15-20% of revenue and erodes trust in analytics. Testing data is harder than testing code because data changes over time — what’s valid today may fail tomorrow. DodaTech runs automated quality checks on all Doda Browser telemetry pipelines to ensure dashboard accuracy.
Real-World Use
Airbnb uses Great Expectations across 10,000+ datasets to detect data drift. dbt Labs tests every model transformation against source uniqueness and referential integrity. Financial institutions run column-level profiling to detect regulatory violations before reports are generated.
flowchart LR
subgraph Sources
A[Raw Data] --> B[Bronze Layer]
end
subgraph Quality[Quality Checks]
B --> C[Expectations<br/>GE]
C --> D{Tests Pass?}
D -->|Yes| E[Silver Layer]
D -->|No| F[Quarantine]
B --> G[Profiling]
G --> H[Anomaly Alert]
end
subgraph Monitoring
E --> I[dbt Tests]
I --> J{DQ Score}
J --> K[Dashboard]
J --> L[PagerDuty]
end
The Six Data Quality Dimensions
Every data quality test maps to one of these dimensions:
| Dimension | What It Measures | Example Test |
|---|---|---|
| Completeness | Are values missing? | NOT NULL, column != '' |
| Uniqueness | Are there duplicates? | COUNT(DISTINCT col) = COUNT(col) |
| Timeliness | Is data fresh? | MAX(updated_at) < 1 hour ago |
| Validity | Does data match format? | Regex pattern, type check |
| Accuracy | Is data correct? | Cross-reference with source |
| Consistency | Does data agree across systems? | Row count match between tables |
class DataQualityDimension:
"""Evaluate a dataset across all DQ dimensions."""
@staticmethod
def completeness(data, column):
"""Completeness: % of non-null values."""
total = len(data)
non_null = sum(1 for row in data if row.get(column) is not None)
return {"dimension": "completeness", "column": column,
"score": round(non_null / total * 100, 2) if total > 0 else 0}
@staticmethod
def uniqueness(data, column):
"""Uniqueness: % of values that are unique."""
total = len(data)
values = [row.get(column) for row in data if row.get(column) is not None]
unique_count = len(set(values))
return {"dimension": "uniqueness", "column": column,
"score": round(unique_count / total * 100, 2) if total > 0 else 0}
@staticmethod
def validity(data, column, pattern):
"""Validity: % of values matching a pattern."""
import re
total = len(data)
valid = sum(1 for row in data
if row.get(column) and re.match(pattern, str(row[column])))
return {"dimension": "validity", "column": column,
"score": round(valid / total * 100, 2) if total > 0 else 0}
@staticmethod
def freshness(last_updated, max_age_hours=24):
"""Timeliness: is data recent enough?"""
from datetime import datetime, timedelta
age_hours = (datetime.now() - last_updated).total_seconds() / 3600
passed = age_hours <= max_age_hours
return {"dimension": "timeliness",
"age_hours": round(age_hours, 1),
"max_age_hours": max_age_hours,
"passed": passed,
"score": 100 if passed else 0}
# Evaluate sample data
sample_data = [
{"email": "alice@example.com", "order_id": "ORD-001"},
{"email": "bob@example.com", "order_id": "ORD-002"},
{"email": None, "order_id": "ORD-003"},
{"email": "charlie@example.com", "order_id": "ORD-001"}, # Duplicate order_id
{"email": "invalid-email", "order_id": "ORD-005"},
]
results = [
DataQualityDimension.completeness(sample_data, "email"),
DataQualityDimension.uniqueness(sample_data, "order_id"),
DataQualityDimension.validity(sample_data, "email", r"^[\w.+-]+@[\w-]+\.[\w.]+$"),
]
for r in results:
print(f"[{r['dimension']}] {r['column']}: {r['score']}%")Expected output:
[completeness] email: 80.0%
[uniqueness] order_id: 80.0%
[validity] email: 60.0%Great Expectations — Declarative Data Validation
Great Expectations (GE) is an open-source library that lets you define “expectations” about your data — like assertions for data — and run them against any data source.
Core Concepts
- Expectation: A declarative statement about data (e.g., “column x must be unique”)
- Expectation Suite: A collection of expectations for a dataset
- Data Context: The project configuration and store
- Checkpoint: A run configuration that connects suites to data
# Simulating Great Expectations validation
class GreatExpectationSuite:
"""Simulate a GE expectation suite."""
def __init__(self, suite_name):
self.suite_name = suite_name
self.expectations = []
self.results = []
def expect_column_to_exist(self, column):
self.expectations.append(("expect_column_to_exist", {"column": column}))
def expect_column_values_to_not_be_null(self, column):
self.expectations.append(("expect_column_values_to_not_be_null", {"column": column}))
def expect_column_values_to_be_unique(self, column):
self.expectations.append(("expect_column_values_to_be_unique", {"column": column}))
def expect_column_values_to_match_regex(self, column, regex):
self.expectations.append(
("expect_column_values_to_match_regex", {"column": column, "regex": regex}))
def expect_column_values_to_be_between(self, column, min_val, max_val):
self.expectations.append(
("expect_column_values_to_be_between",
{"column": column, "min": min_val, "max": max_val}))
def run(self, data):
"""Run all expectations against data."""
import re
self.results = []
for exp_name, kwargs in self.expectations:
column = kwargs.get("column")
values = [row.get(column) for row in data]
non_null = [v for v in values if v is not None]
if exp_name == "expect_column_to_exist":
passed = column in (data[0] if data else {})
elif exp_name == "expect_column_values_to_not_be_null":
passed = all(v is not None for v in values)
elif exp_name == "expect_column_values_to_be_unique":
passed = len(non_null) == len(set(non_null))
elif exp_name == "expect_column_values_to_match_regex":
passed = all(re.match(kwargs["regex"], str(v))
for v in non_null)
elif exp_name == "expect_column_values_to_be_between":
passed = all(kwargs["min"] <= v <= kwargs["max"]
for v in non_null)
else:
passed = True
self.results.append({
"expectation": exp_name,
"kwargs": kwargs,
"passed": passed,
"observed": f"{sum(1 for v in values if v is not None)} values",
})
return self.results
def summary(self):
total = len(self.results)
passed = sum(1 for r in self.results if r["passed"])
print(f"\n=== GE Suite: {self.suite_name} ===")
print(f"Passed: {passed}/{total}")
for r in self.results:
status = "✅" if r["passed"] else "❌"
print(f" {status} {r['expectation']}({r['kwargs']}) -> {r['observed']}")
# Create and run a suite
suite = GreatExpectationSuite("orders_suite")
suite.expect_column_to_exist("order_id")
suite.expect_column_values_to_not_be_null("order_id")
suite.expect_column_values_to_be_unique("order_id")
suite.expect_column_values_to_match_regex("order_id", r"^ORD-\d{3}$")
suite.expect_column_values_to_be_between("amount", 0.01, 10000)
order_data = [
{"order_id": "ORD-001", "amount": 250.0},
{"order_id": "ORD-002", "amount": 50.0},
{"order_id": None, "amount": -5.0}, # Null order_id, negative amount
{"order_id": "ORD-004", "amount": 15000.0}, # Over max
]
suite.run(order_data)
suite.summary()Expected output:
=== GE Suite: orders_suite ===
Passed: 2/5
✅ expect_column_to_exist({'column': 'order_id'}) -> 4 values
❌ expect_column_values_to_not_be_null({'column': 'order_id'}) -> 4 values
❌ expect_column_values_to_be_unique({'column': 'order_id'}) -> 4 values
✅ expect_column_values_to_match_regex({'column': 'order_id'}) -> 4 values
❌ expect_column_values_to_be_between({'column': 'amount'}) -> 4 valuesData Profiling with Great Expectations
GE can also generate an expectation suite automatically by profiling data.
class DataProfiler:
"""Auto-generate expectations by profiling data."""
def __init__(self, data):
self.data = data
self.profile = {}
def profile_column(self, column):
values = [row.get(column) for row in self.data if row.get(column) is not None]
non_null = sum(1 for row in self.data if row.get(column) is not None)
total = len(self.data)
profile = {
"column": column,
"type": type(values[0]).__name__ if values else "unknown",
"non_null_count": non_null,
"null_count": total - non_null,
"null_pct": round((total - non_null) / total * 100, 2) if total > 0 else 0,
"unique_count": len(set(values)),
"min": min(values) if values else None,
"max": max(values) if values else None,
}
if profile["type"] in ("int", "float"):
numeric = [v for v in values if isinstance(v, (int, float))]
profile["mean"] = round(sum(numeric) / len(numeric), 2) if numeric else 0
profile["std"] = round(
(sum((x - profile["mean"])**2 for x in numeric) / len(numeric))**0.5, 2
) if numeric else 0
self.profile[column] = profile
return profile
def suggest_expectations(self):
"""Suggest GE expectations based on profile."""
suggestions = []
for col, p in self.profile.items():
suggestions.append(f"expect_column_to_exist('{col}')")
if p["null_pct"] > 0:
suggestions.append(f"expect_column_values_to_not_be_null('{col}') "
f"[FAIL: {p['null_pct']}% nulls]")
if p["null_pct"] == 0:
suggestions.append(f"expect_column_values_to_not_be_null('{col}')")
if p["unique_count"] == p["non_null_count"] and p["non_null_count"] > 0:
suggestions.append(f"expect_column_values_to_be_unique('{col}')")
if p["min"] is not None and p["max"] is not None:
suggestions.append(f"expect_column_values_to_be_between('{col}', "
f"{p['min']}, {p['max']})")
return suggestions
profiler = DataProfiler(order_data)
for col in ["order_id", "amount"]:
p = profiler.profile_column(col)
print(f"Profile {col}: null={p['null_pct']}%, unique={p['unique_count']}, "
f"range=[{p['min']}, {p['max']}]")
print("\nSuggested expectations:")
for s in profiler.suggest_expectations():
print(f" {s}")Expected output:
Profile order_id: null=25.0%, unique=3, range=[None, None]
Profile amount: null=0.0%, unique=4, range=[-5.0, 15000.0]
Suggested expectations:
expect_column_to_exist('order_id')
expect_column_values_to_not_be_null('order_id') [FAIL: 25.0% nulls]
expect_column_values_to_not_be_null('amount')
expect_column_values_to_be_between('amount', -5.0, 15000.0)dbt Tests — SQL-Based Quality Checks
dbt provides two types of tests: generic (schema-based) and singular (SQL queries).
Generic Tests
Generic tests are defined in YAML and validate one thing: uniqueness, non-null, accepted values, or referential integrity.
# models/schema.yml — Generic dbt tests
version: 2
models:
- name: stg_orders
columns:
- name: order_id
tests:
- unique
- not_null
- name: status
tests:
- accepted_values:
values: ['pending', 'shipped', 'completed', 'cancelled']
- name: customer_id
tests:
- relationships:
to: ref('stg_customers')
field: customer_idSingular Tests
Singular tests are SQL queries that return failing rows:
-- tests/assert_positive_order_amount.sql
-- Fail if any order amount is negative or zero
SELECT *
FROM {{ ref('stg_orders') }}
WHERE amount <= 0-- tests/assert_recent_data_fresh.sql
-- Fail if no orders in the last hour
SELECT *
FROM {{ ref('stg_orders') }}
WHERE order_date < DATEADD('hour', -1, CURRENT_DATE)
LIMIT 1# Simulating dbt test execution
class DbtTestRunner:
"""Run dbt-style tests against data."""
def __init__(self):
self.tests = []
self.results = []
def add_generic_test(self, model, column, test_type, params=None):
self.tests.append({
"model": model,
"column": column,
"type": test_type,
"params": params or {},
})
def add_singular_test(self, name, sql, models_map):
self.tests.append({
"type": "singular",
"name": name,
"sql": sql,
"models": models_map,
})
def run(self, data_map):
"""Run all tests against provided data."""
self.results = []
for test in self.tests:
if test["type"] == "singular":
self._run_singular(test, data_map)
else:
self._run_generic(test, data_map)
return self.results
def _run_generic(self, test, data_map):
col = test["column"]
model = test["model"]
data = data_map.get(model, [])
values = [row.get(col) for row in data if row.get(col) is not None]
total = len(data)
if test["type"] == "not_null":
passed = all(row.get(col) is not None for row in data)
result = (f"not_null({model}.{col})", passed,
f"{sum(1 for v in values if v is not None)}/{total} non-null")
elif test["type"] == "unique":
passed = len(values) == len(set(values))
result = (f"unique({model}.{col})", passed,
f"{len(values)} values, {len(set(values))} unique")
elif test["type"] == "accepted_values":
accepted = set(test["params"].get("values", []))
bad = [v for v in values if v not in accepted]
passed = len(bad) == 0
result = (f"accepted_values({model}.{col})", passed,
f"{len(bad)} invalid: {bad[:3]}")
elif test["type"] == "relationships":
to_model = test["params"].get("to")
to_field = test["params"].get("field")
ref_data = data_map.get(to_model, [])
ref_values = {row.get(to_field) for row in ref_data if row.get(to_field)}
orphaned = [v for v in values if v not in ref_values]
passed = len(orphaned) == 0
result = (f"relationships({model}.{col} -> {to_model}.{to_field})",
passed, f"{len(orphaned)} orphaned: {orphaned[:3]}")
else:
result = (test["type"], True, "Unknown test")
self.results.append(result)
def _run_singular(self, test, data_map):
"""Simulate running a SQL singular test."""
sql = test["sql"]
models = test["models"]
ref_name = test.get("name", "singular_test")
# Simple simulation: return rows where condition fails
model_name = list(models.keys())[0] if models else None
data = data_map.get(model_name, []) if model_name else []
# Naive parsing: simulate WHERE clause
failing = [row for row in data if row.get("amount", 1) <= 0]
passed = len(failing) == 0
self.results.append(
(f"singular.{ref_name}", passed, f"{len(failing)} failing rows"))
def summary(self):
total = len(self.results)
passed_count = sum(1 for _, p, _ in self.results if p)
print(f"\n=== dbt Test Results ===")
print(f"Passed: {passed_count}/{total}")
for name, passed, detail in self.results:
status = "✅" if passed else "❌"
print(f" {status} {name} -> {detail}")
# Run simulated dbt tests
runner = DbtTestRunner()
runner.add_generic_test("stg_orders", "order_id", "not_null")
runner.add_generic_test("stg_orders", "order_id", "unique")
runner.add_generic_test("stg_orders", "status", "accepted_values",
{"values": ["pending", "shipped", "completed", "cancelled"]})
runner.add_generic_test("stg_orders", "customer_id", "relationships",
{"to": "stg_customers", "field": "customer_id"})
runner.add_singular_test("assert_positive_order_amount",
"SELECT * FROM stg_orders WHERE amount <= 0",
{"stg_orders": "orders"})
test_data = {
"stg_orders": [
{"order_id": "ORD-001", "status": "completed", "customer_id": "C001", "amount": 250.0},
{"order_id": "ORD-002", "status": "pending", "customer_id": "C002", "amount": 50.0},
{"order_id": "ORD-003", "status": "unknown", "customer_id": "C003", "amount": -5.0},
{"order_id": None, "status": "completed", "customer_id": "C001", "amount": 10.0},
{"order_id": "ORD-001", "status": "completed", "customer_id": "C099", "amount": 100.0},
],
"stg_customers": [
{"customer_id": "C001", "name": "Alice"},
{"customer_id": "C002", "name": "Bob"},
],
}
runner.run(test_data)
runner.summary()Expected output:
=== dbt Test Results ===
Passed: 1/5
❌ not_null(stg_orders.order_id) -> 4/5 non-null
❌ unique(stg_orders.order_id) -> 4 values, 3 unique
❌ accepted_values(stg_orders.status) -> 1 invalid: ['unknown']
❌ relationships(stg_orders.customer_id -> stg_customers.customer_id) -> 1 orphaned: ['C099']
❌ singular.assert_positive_order_amount -> 1 failing rowsAutomated Validation Pipeline
Combine profiling, expectations, and dbt tests into an automated pipeline triggered on every data load.
class AutomatedValidationPipeline:
"""End-to-end automated data quality pipeline."""
def __init__(self, pipeline_name):
self.name = pipeline_name
self.steps = []
self.scores = {}
def add_step(self, step_name, check_fn):
self.steps.append({"name": step_name, "fn": check_fn})
def run(self, data):
print(f"\n{'='*50}")
print(f"Pipeline: {self.name}")
print(f"{'='*50}")
for step in self.steps:
print(f"\n▶ {step['name']}")
result = step["fn"](data)
self.scores[step["name"]] = result
if isinstance(result, dict):
for k, v in result.items():
print(f" {k}: {v}")
overall = self.calculate_overall()
print(f"\n{'='*50}")
print(f"Overall DQ Score: {overall}%")
if overall < 80:
print("STATUS: ❌ FAIL - Blocking pipeline")
elif overall < 95:
print("STATUS: ⚠ WARN - Non-blocking alert")
else:
print("STATUS: ✅ PASS")
print(f"{'='*50}")
return overall
def calculate_overall(self):
scores = []
for name, result in self.scores.items():
if isinstance(result, dict) and "score" in result:
scores.append(result["score"])
return round(sum(scores) / len(scores), 1) if scores else 0
# Build pipeline
dq = AutomatedValidationPipeline("clickstream_quality")
# Step 1: Schema validation
def check_schema(data):
expected = {"user_id", "page", "event_time", "duration_s"}
actual = set(data[0].keys()) if data else set()
missing = expected - actual
return {"expected_columns": list(expected), "missing": list(missing),
"score": 100 if not missing else 50}
# Step 2: Completeness
def check_completeness(data):
total = len(data)
nulls = sum(1 for row in data if not row.get("user_id"))
return {"total_rows": total, "null_user_ids": nulls,
"score": round((total - nulls) / total * 100, 1)}
# Step 3: Freshness
def check_freshness(data):
from datetime import datetime
# Assume latest event_time
times = [row.get("event_time") for row in data if row.get("event_time")]
if times:
latest = max(times)
age_mins = (datetime.now() - latest).total_seconds() / 60
return {"latest_event": latest.isoformat(), "age_minutes": round(age_mins, 1),
"score": 100 if age_mins < 60 else 0}
return {"latest_event": None, "age_minutes": None, "score": 0}
dq.add_step("Schema Validation", check_schema)
dq.add_step("Completeness Check", check_completeness)
dq.add_step("Freshness Check", check_freshness)
from datetime import datetime, timedelta
sample = [
{"user_id": "u1", "page": "/home", "event_time": datetime.now() - timedelta(minutes=5),
"duration_s": 30},
{"user_id": "u2", "page": "/about", "event_time": datetime.now() - timedelta(minutes=10),
"duration_s": 45},
{"user_id": None, "page": "/contact", "event_time": datetime.now() - timedelta(hours=2),
"duration_s": 15},
]
dq.run(sample)Expected output:
==================================================
Pipeline: clickstream_quality
==================================================
▶ Schema Validation
expected_columns: ['page', 'user_id', 'duration_s', 'event_time']
missing: []
score: 100
▶ Completeness Check
total_rows: 3
null_user_ids: 1
score: 66.7
▶ Freshness Check
latest_event: 2026-06-20T...
age_minutes: 5.0
score: 100
==================================================
Overall DQ Score: 88.9%
STATUS: ⚠ WARN - Non-blocking alert
==================================================Common Mistakes
1. Testing Only at the End
Quality checks must run at every layer (bronze → silver → gold). A single bad record corrupts all downstream aggregations.
2. Not Testing Source Data
Assume third-party data is always unclean. Run not_null and accepted_values tests on every source immediately after ingestion.
3. No Freshness Alerts
A pipeline that loads stale data silently is worse than a broken pipeline. Always set freshness SLAs with alerts.
4. Tests That Never Fail
If all tests always pass, they’re not testing enough. Review test coverage quarterly and add edge case expectations.
5. Ignoring Data Distribution Drift
Schema tests catch structural changes, but distribution drift (e.g., “status” suddenly has 50% nulls instead of 0%) often goes undetected.
Practice Questions
What are the six data quality dimensions? Completeness, uniqueness, timeliness, validity, accuracy, consistency.
How does Great Expectations differ from dbt tests? GE is declarative (assertions about data state), engine-agnostic, and runs outside the warehouse. dbt tests are SQL-based, run inside the warehouse, and integrate with dbt models.
What is the difference between a generic and a singular dbt test? Generic tests are defined in YAML schema files (not_null, unique, relationships). Singular tests are custom SQL queries that return failing rows.
What is data profiling and why is it useful? Profiling analyzes column statistics (null %, unique count, min/max, distribution) to discover data characteristics and suggest quality expectations.
Challenge: Design a data quality framework for a real-time pipeline processing 10M events/hour. How would sampling work? What would you alert on?
Mini Project: Data Quality Dashboard
# dq_dashboard.py
# Simulate a data quality monitoring dashboard
class DQDashboard:
def __init__(self):
self.scores = {}
self.alerts = []
self.trends = []
def record_run(self, pipeline_name, score, timestamp=None):
from datetime import datetime
ts = timestamp or datetime.now()
if pipeline_name not in self.scores:
self.scores[pipeline_name] = []
self.scores[pipeline_name].append((ts, score))
self.trends.append({"pipeline": pipeline_name, "timestamp": ts, "score": score})
if score < 80:
self.alerts.append({
"pipeline": pipeline_name,
"timestamp": ts,
"score": score,
"severity": "CRITICAL" if score < 60 else "WARNING",
})
def dashboard(self):
from datetime import datetime, timedelta
now = datetime.now()
print(f"\n{'='*55}")
print(f" Data Quality Dashboard — {now.date()}")
print(f"{'='*55}")
for pipeline, entries in self.scores.items():
latest = entries[-1][1] if entries else 0
trend = "↑" if len(entries) > 1 and entries[-1][1] > entries[-2][1] else "↓"
bar = "█" * (latest // 10)
print(f"\n{pipeline}")
print(f" Latest: {latest}% {trend} {bar}")
print(f"\n Active Alerts: {len(self.alerts)}")
for alert in self.alerts[:3]:
print(f" {alert['severity']}: {alert['pipeline']} "
f"scored {alert['score']}%")
# Demo
dash = DQDashboard()
dash.record_run("clickstream", 92.1)
dash.record_run("clickstream", 88.5)
dash.record_run("clickstream", 73.2) # Warning
dash.record_run("orders", 95.0)
dash.record_run("orders", 100.0)
dash.dashboard()What’s Next
Built by the developers of Doda Browser, DodaZIP, and Durga Antivirus Pro.
Built by the developers of DodaTech
Doda Browser, DodaZIP & Durga Antivirus Pro