Data Catalog & Lineage — Atlas, DataHub, Amundsen & Column-Level Lineage
A data catalog is a metadata management platform that indexes datasets, tracks lineage from source to consumption, enables discovery, and powers impact analysis — the nervous system of a modern data platform.
What You’ll Learn
This tutorial covers data catalog fundamentals: Apache Atlas for governance, DataHub for real-time metadata, Amundsen for data discovery, column-level lineage tracking, impact analysis workflows, and how catalogs integrate with data lakes, warehouses, and processing engines.
Why It Matters
Enterprises with thousands of datasets need a way to answer: Where did this data come from? Who depends on it? What happens if I change this column? Without a catalog, data teams waste 30-40% of their time finding and understanding data. DodaTech uses catalogs to manage telemetry data across Doda Browser and threat intelligence in Durga Antivirus Pro.
Real-World Use
LinkedIn developed DataHub to track metadata across thousands of datasets, pipelines, and ML models. Uber built DataHQ for similar needs. Financial institutions use Apache Atlas with Apache Ranger for compliance-driven data governance (GDPR, CCPA, SOX).
flowchart TD
subgraph Sources
A[Databases] --> D[Data Catalog]
B[Data Lakes] --> D
C[Streams/Kafka] --> D
end
subgraph Catalog[Data Catalog]
D --> E[Discovery]
D --> F[Lineage]
D --> G[Governance]
D --> H[Glossary]
end
subgraph Consumers
E --> I[Data Scientists]
F --> J[Data Engineers]
G --> K[Compliance]
H --> L[Business Analysts]
end
Why Data Catalogs Matter
Think of a data catalog as the card catalog of a library. Without it, finding the right book means wandering through shelves at random. With it, you find exactly what you need in seconds — plus you see which shelves connect to each other.
Key Functions
| Function | Description | Stakeholder |
|---|---|---|
| Discovery | Find datasets by name, tag, domain | All data users |
| Lineage | Show data flow: source → transform → report | Engineers, auditors |
| Impact Analysis | What breaks if I change this column? | Engineers |
| Governance | Tags, classifications, access control | Compliance |
| Glossary | Business definitions for technical assets | Analysts |
| Profiling | Statistics: row count, null %, distinct values | All |
Apache Atlas — Governance-Focused Catalog
Apache Atlas is an open-source metadata and governance platform designed for Hadoop ecosystems, providing data classification, lineage, and policy-driven access control.
# Simulating Apache Atlas metadata operations
class AtlasClient:
"""Simulate interaction with Apache Atlas REST API."""
def __init__(self, cluster_name="dodatech_cluster"):
self.cluster = cluster_name
self._entities = {}
self._classifications = {}
self._lineage = {}
def create_entity(self, entity_type, qualified_name, attributes):
"""Register a metadata entity."""
entity = {
"typeName": entity_type,
"attributes": {
"qualifiedName": qualified_name,
"name": attributes.get("name"),
"description": attributes.get("description"),
"owner": attributes.get("owner"),
"createTime": "2026-06-20T10:00:00Z",
},
}
self._entities[qualified_name] = entity
print(f"[Atlas] Created {entity_type}: {qualified_name}")
return qualified_name
def add_classification(self, qualified_name, classification_name):
"""Tag an entity with a classification (PII, sensitive, etc.)."""
if qualified_name not in self._classifications:
self._classifications[qualified_name] = []
self._classifications[qualified_name].append(classification_name)
print(f"[Atlas] Tagged {qualified_name} -> {classification_name}")
def add_lineage(self, source_qn, target_qn, process_name):
"""Record lineage: source -> process -> target."""
self._lineage[(source_qn, target_qn)] = {
"process": process_name,
"timestamp": "2026-06-20T10:00:00Z",
}
print(f"[Atlas] Lineage: {source_qn} -> {process_name} -> {target_qn}")
def get_lineage(self, qualified_name, direction="BOTH"):
"""Get upstream or downstream lineage."""
print(f"\n[Atlas] Lineage for {qualified_name} ({direction}):")
for (src, tgt), info in self._lineage.items():
if direction in ("BOTH", "INPUT") and tgt == qualified_name:
print(f" Upstream: {src} -> {info['process']}")
if direction in ("BOTH", "OUTPUT") and src == qualified_name:
print(f" Downstream: {tgt} via {info['process']}")
def search(self, query):
"""Search entities by name or tag."""
results = []
for qn, entity in self._entities.items():
if query.lower() in qn.lower() or query.lower() in str(entity).lower():
results.append(qn)
return results
# Simulate cataloging DodaTech's data platform
atlas = AtlasClient()
# Create entities
atlas.create_entity("hive_table", "raw_db.clicks",
{"name": "clicks", "description": "Raw clickstream events",
"owner": "data_eng"})
atlas.create_entity("hive_table", "analytics.daily_metrics",
{"name": "daily_metrics", "description": "Aggregated daily metrics",
"owner": "analytics"})
atlas.create_entity("process", "etl.clean_clicks",
{"name": "clean_clicks_job", "description": "Cleans raw clicks",
"owner": "data_eng"})
# Add classifications
atlas.add_classification("raw_db.clicks", "PII") # Contains user IDs
atlas.add_classification("analytics.daily_metrics", "BI_Ready")
# Add lineage
atlas.add_lineage("raw_db.clicks", "analytics.daily_metrics", "etl.clean_clicks")
# Query
atlas.get_lineage("analytics.daily_metrics", "INPUT")
print(f"\nSearch 'clicks': {atlas.search('clicks')}")Expected output:
[Atlas] Created hive_table: raw_db.clicks
[Atlas] Created hive_table: analytics.daily_metrics
[Atlas] Created process: etl.clean_clicks
[Atlas] Tagged raw_db.clicks -> PII
[Atlas] Tagged analytics.daily_metrics -> BI_Ready
[Atlas] Lineage: raw_db.clicks -> etl.clean_clicks -> analytics.daily_metrics
[Atlas] Lineage for analytics.daily_metrics (INPUT):
Upstream: raw_db.clicks -> etl.clean_clicks
Search 'clicks': ['raw_db.clicks']DataHub — Real-Time Metadata Platform
DataHub, originally built by LinkedIn, is a metadata platform that pulls metadata from various sources (dbt, Airflow, Kafka, Spark, Tables) and provides real-time search, lineage, and governance.
class DataHubClient:
"""Simulate DataHub metadata ingestion and search."""
def __init__(self):
self._datasets = {}
self._pipelines = {}
self._lineage_edges = []
self._docs = {}
def ingest_dataset(self, urn, name, platform, schema_fields):
"""Register a dataset with its schema."""
self._datasets[urn] = {
"name": name,
"platform": platform,
"schema": schema_fields,
"tags": [],
"ownership": None,
}
print(f"[DataHub] Ingested dataset: {name} ({platform})")
def ingest_pipeline(self, urn, name):
"""Register a data pipeline."""
self._pipelines[urn] = {"name": name, "runs": 0}
def add_lineage(self, source_urn, target_urn):
"""Record dataset-level lineage."""
self._lineage_edges.append((source_urn, target_urn))
src_name = self._datasets.get(source_urn, {}).get("name", source_urn)
tgt_name = self._datasets.get(target_urn, {}).get("name", target_urn)
print(f"[DataHub] Lineage: {src_name} -> {tgt_name}")
def add_column_lineage(self, dataset_urn, source_col, target_col):
"""Record column-level lineage."""
col_key = f"{dataset_urn}.column_lineage"
if col_key not in self._lineage_edges:
self._lineage_edges.append(col_key)
print(f"[DataHub] Column lineage: {dataset_urn}.{target_col} <- {source_col}")
def search(self, query):
"""Search datasets by name or schema field."""
results = []
for urn, ds in self._datasets.items():
if query.lower() in ds["name"].lower():
results.append(ds["name"])
for field in ds["schema"]:
if query.lower() in field.get("name", "").lower():
results.append(f"{ds['name']}.{field['name']}")
return results
def impact_analysis(self, urn):
"""Find all downstream dependencies."""
print(f"\n[DataHub] Impact analysis for {self._datasets.get(urn, {}).get('name', urn)}:")
affected = []
for src, tgt in self._lineage_edges:
if src == urn:
downstream = self._datasets.get(tgt, {}).get("name", tgt)
affected.append(downstream)
print(f" ⚠ Downstream: {downstream}")
return affected
# Simulate
dh = DataHubClient()
dh.ingest_dataset("urn:li:dataset:raw_clicks", "raw_clicks", "hive",
[{"name": "user_id", "type": "string"},
{"name": "page_url", "type": "string"},
{"name": "click_time", "type": "timestamp"}])
dh.ingest_dataset("urn:li:dataset:clean_clicks", "clean_clicks", "hive",
[{"name": "user_id", "type": "string"},
{"name": "page_path", "type": "string"},
{"name": "event_time", "type": "timestamp"}])
dh.ingest_dataset("urn:li:dataset:daily_metrics", "daily_metrics", "hive",
[{"name": "date", "type": "date"},
{"name": "page_views", "type": "bigint"},
{"name": "unique_users", "type": "bigint"}])
dh.add_lineage("urn:li:dataset:raw_clicks", "urn:li:dataset:clean_clicks")
dh.add_lineage("urn:li:dataset:clean_clicks", "urn:li:dataset:daily_metrics")
dh.add_column_lineage("urn:li:dataset:clean_clicks", "user_id", "user_id")
print(f"\nSearch 'clicks': {dh.search('clicks')}")
dh.impact_analysis("urn:li:dataset:clean_clicks")Expected output:
[DataHub] Ingested dataset: raw_clicks (hive)
[DataHub] Ingested dataset: clean_clicks (hive)
[DataHub] Ingested dataset: daily_metrics (hive)
[DataHub] Lineage: raw_clicks -> clean_clicks
[DataHub] Lineage: clean_clicks -> daily_metrics
[DataHub] Column lineage: urn:li:dataset:clean_clicks.user_id <- user_id
Search 'clicks': ['raw_clicks', 'clean_clicks']
[DataHub] Impact analysis for clean_clicks:
⚠ Downstream: daily_metricsAmundsen — Data Discovery Focus
Amundsen, originally built by Lyft, focuses on data discovery with a Google-like search experience, table previews, and popularity signals.
class AmundsenClient:
"""Simulate Amundsen data discovery."""
def __init__(self):
self._tables = {}
self._popularity = {}
def register_table(self, name, database, cluster, schema, description, tags=None):
"""Register a table for discovery."""
key = f"{database}://{cluster}.{schema}/{name}"
self._tables[key] = {
"name": name,
"database": database,
"cluster": cluster,
"schema": schema,
"description": description,
"tags": tags or [],
"owners": [],
"columns": [],
}
print(f"[Amundsen] Registered: {key}")
return key
def add_column(self, table_key, col_name, col_type, description):
self._tables[table_key]["columns"].append({
"name": col_name,
"type": col_type,
"description": description,
})
def record_usage(self, table_key, user):
"""Record table usage (boosts search ranking)."""
self._popularity[table_key] = self._popularity.get(table_key, 0) + 1
def search(self, query):
"""Search with relevance scoring."""
results = []
for key, table in self._tables.items():
score = 0
if query.lower() in table["name"].lower():
score += 10
if query.lower() in table["description"].lower():
score += 5
for tag in table["tags"]:
if query.lower() in tag.lower():
score += 3
if score > 0:
results.append((score + self._popularity.get(key, 0),
table["name"], key))
results.sort(reverse=True)
return [(name, key) for _, name, key in results]
amundsen = AmundsenClient()
clicks = amundsen.register_table("clicks", "hive", "prod", "web",
"Raw clickstream from Doda Browser",
tags=["events", "web", "product"])
amundsen.add_column(clicks, "user_id", "string", "Anonymous user ID")
amundsen.add_column(clicks, "page_url", "string", "Full URL of page view")
amundsen.add_column(clicks, "click_time", "timestamp", "When the click occurred")
daily = amundsen.register_table("daily_metrics", "hive", "prod", "analytics",
"Aggregated daily page metrics",
tags=["aggregated", "kpi"])
amundsen.add_column(daily, "date", "date", "Calendar date")
amundsen.add_column(daily, "page_views", "bigint", "Total page views")
amundsen.record_usage(clicks, "alice")
amundsen.record_usage(clicks, "bob")
amundsen.record_usage(daily, "alice")
print("Search 'clicks':", [r[0] for r in amundsen.search("clicks")])
print("Search 'metrics':", [r[0] for r in amundsen.search("metrics")])Expected output:
[Amundsen] Registered: hive://prod.web/clicks
[Amundsen] Registered: hive://prod.analytics/daily_metrics
Search 'clicks': ['clicks']
Search 'metrics': ['daily_metrics']Column-Level Lineage
Column-level lineage tracks the flow of individual columns through transformations, showing exactly how each field in the output was derived from source columns.
class ColumnLineageTracker:
"""Track column transformations through a pipeline."""
def __init__(self):
self._edges = [] # (source_table, source_col, target_table, target_col, transform)
def add_mapping(self, src_table, src_col, tgt_table, tgt_col, transform="direct"):
"""Record a column-level mapping."""
self._edges.append({
"source": f"{src_table}.{src_col}",
"target": f"{tgt_table}.{tgt_col}",
"transform": transform,
})
def upstream(self, table, column):
"""Find what a column depends on."""
deps = []
for e in self._edges:
if e["target"] == f"{table}.{column}":
deps.append((e["source"], e["transform"]))
return deps
def downstream(self, table, column):
"""Find what depends on this column."""
deps = []
for e in self._edges:
if e["source"] == f"{table}.{column}":
deps.append((e["target"], e["transform"]))
return deps
def impact(self, table, column):
"""Full impact analysis: what breaks if this column changes."""
print(f"\nImpact of changing {table}.{column}:")
downstream_deps = self.downstream(table, column)
if not downstream_deps:
print(" No direct downstream dependencies")
for dep, transform in downstream_deps:
print(f" ⚠ Breaks: {dep} (via {transform})")
# Recursive: check downstream of downstream
tgt_table, tgt_col = dep.split(".")
sub_deps = self.downstream(tgt_table, tgt_col)
for sub_dep, sub_transform in sub_deps:
print(f" ⚠ Also: {sub_dep} (via {sub_transform})")
# Simulate column-level lineage for a data pipeline
tracker = ColumnLineageTracker()
# bronze -> silver
tracker.add_mapping("raw_clicks", "user_id", "clean_clicks", "user_id")
tracker.add_mapping("raw_clicks", "page_url", "clean_clicks", "page_path",
"regex: parse path from URL")
tracker.add_mapping("raw_clicks", "click_time", "clean_clicks", "event_time",
"cast: to_timestamp")
# silver -> gold
tracker.add_mapping("clean_clicks", "user_id", "daily_metrics", "unique_users",
"count(distinct)")
tracker.add_mapping("clean_clicks", "page_path", "daily_metrics", "page_views",
"count(*) group by page_path")
# Query
print("Upstream of daily_metrics.unique_users:",
tracker.upstream("daily_metrics", "unique_users"))
tracker.impact("raw_clicks", "user_id")Expected output:
Upstream of daily_metrics.unique_users: [('clean_clicks.user_id', 'count(distinct)')]
Impact of changing raw_clicks.user_id:
⚠ Breaks: clean_clicks.user_id (via direct)
⚠ Also: daily_metrics.unique_users (via count(distinct))Impact Analysis
Impact analysis answers: “If I change this table or column, what downstream dashboards, pipelines, or reports will break?”
class ImpactAnalyzer:
"""Full impact analysis across datasets, dashboards, and pipelines."""
def __init__(self):
self._tables = {}
self._dashboards = {}
self._pipelines = {}
def add_table(self, name):
self._tables[name] = {"downstream_tables": [], "downstream_dashboards": []}
def add_dashboard(self, name, chart_names):
self._dashboards[name] = {"charts": chart_names}
def add_pipeline(self, name, input_tables, output_tables):
self._pipelines[name] = {"inputs": input_tables, "outputs": output_tables}
for tbl in input_tables:
if tbl in self._tables:
self._tables[tbl]["downstream_tables"].extend(output_tables)
def link_table_to_dashboard(self, table, dashboard):
if table in self._tables:
self._tables[table]["downstream_dashboards"].append(dashboard)
def analyze(self, table_name):
"""Show what depends on this table."""
print(f"\n=== Impact Analysis: {table_name} ===")
info = self._tables.get(table_name, {})
if info.get("downstream_tables"):
print(f"\n📊 Downstream tables: {set(info['downstream_tables'])}")
if info.get("downstream_dashboards"):
print(f"📈 Dashboards affected: {set(info['downstream_dashboards'])}")
for pipe_name, pipe in self._pipelines.items():
if table_name in pipe["inputs"]:
print(f"🔧 Pipeline breakage: {pipe_name}")
# Simulate an enterprise data platform
ia = ImpactAnalyzer()
ia.add_table("raw_clicks")
ia.add_table("clean_clicks")
ia.add_table("daily_metrics")
ia.add_table("user_profiles")
ia.add_pipeline("click_cleaner", ["raw_clicks"], ["clean_clicks"])
ia.add_pipeline("daily_agg", ["clean_clicks"], ["daily_metrics"])
ia.add_dashboard("Executive Dashboard", ["DAU Trends", "Page Views by Day"])
ia.add_dashboard("Product Report", ["Top Pages", "User Growth"])
ia.link_table_to_dashboard("daily_metrics", "Executive Dashboard")
ia.link_table_to_dashboard("daily_metrics", "Product Report")
ia.analyze("clean_clicks")Expected output:
=== Impact Analysis: clean_clicks ===
📊 Downstream tables: {'daily_metrics'}
📈 Dashboards affected: {'Executive Dashboard', 'Product Report'}
🔧 Pipeline breakage: daily_aggCommon Mistakes
1. Catalog Without Automation
Manually registering tables guarantees stale metadata. Automate ingestion from Hive Metastore, AWS Glue, dbt, and Airflow.
2. No Column-Level Lineage
Table-level lineage tells you clickstream → dashboard. Column-level tells you clickstream.user_id → dashboard.unique_users. The latter finds real breakage.
3. Ignoring Data Profiling
A catalog without row counts, null percentages, and freshness metrics is just a list of table names — not useful for trust.
4. Not Tagging Sensitivity
Without PII/PHI/PCI classifications on columns, compliance audits require manual review of every dataset.
5. One Catalog to Rule All
Large organizations need a federated approach: Atlas for Hadoop governance, DataHub for streaming metadata, Amundsen for discovery. Bridge them via OpenMetadata or a custom sync.
Practice Questions
What is a data catalog and why do enterprises need one? A data catalog indexes datasets, lineage, and metadata for discovery, governance, and impact analysis. Enterprises need it to avoid “data swamp” and comply with regulations.
How does column-level lineage differ from table-level lineage? Column-level tracks individual field transformations (source column → target column), enabling precise impact analysis. Table-level shows only table dependencies.
What is the primary focus of each: Apache Atlas vs DataHub vs Amundsen? Atlas: governance and classification. DataHub: real-time metadata with rich lineage. Amundsen: data discovery and search.
What is impact analysis and when would you use it? Impact analysis determines what breaks if a dataset or column changes. Used before schema changes, pipeline modifications, or deletions.
Challenge: Design a catalog strategy for a company with Hadoop, Snowflake, Kafka, and dbt. Which catalog(s) would you use and how would they integrate?
Mini Project: Lightweight Data Catalog
# data_catalog.py
# Build a simple in-memory data catalog with lineage and search
class SimpleDataCatalog:
def __init__(self):
self.datasets = {}
self.columns = {}
self.lineage_edges = []
def add_dataset(self, name, description, owner):
self.datasets[name] = {
"description": description,
"owner": owner,
"columns": [],
"tags": [],
}
def add_column(self, dataset, name, col_type, description, tags=None):
col = {"name": name, "type": col_type,
"description": description, "tags": tags or []}
self.datasets[dataset]["columns"].append(col)
self.columns[f"{dataset}.{name}"] = col
def add_lineage(self, source_dataset, source_col, target_dataset, target_col):
self.lineage_edges.append({
"source": f"{source_dataset}.{source_col}",
"target": f"{target_dataset}.{target_col}",
})
def search(self, query):
results = []
for name, ds in self.datasets.items():
if query.lower() in name.lower():
results.append(("dataset", name, ds["description"]))
for col in ds["columns"]:
if query.lower() in col["name"].lower():
results.append(("column", f"{name}.{col['name']}", col["description"]))
return results
def lineage_for(self, dataset_name):
result = {"upstream": [], "downstream": []}
for edge in self.lineage_edges:
src_ds, src_col = edge["source"].split(".")
tgt_ds, tgt_col = edge["target"].split(".")
if tgt_ds == dataset_name:
result["upstream"].append(edge["source"])
elif src_ds == dataset_name:
result["downstream"].append(edge["target"])
return result
# Demo
cat = SimpleDataCatalog()
cat.add_dataset("raw_orders", "Raw orders from e-commerce", "data_eng")
cat.add_column("raw_orders", "order_id", "int", "Primary key")
cat.add_column("raw_orders", "amount", "decimal", "Order total")
cat.add_dataset("daily_revenue", "Aggregated daily revenue", "analytics")
cat.add_column("daily_revenue", "revenue_date", "date", "Calendar date")
cat.add_column("daily_revenue", "total_revenue", "decimal", "Sum of daily orders")
cat.add_lineage("raw_orders", "amount", "daily_revenue", "total_revenue")
print("Search 'revenue':", cat.search("revenue"))
print("\nLineage for daily_revenue:", cat.lineage_for("daily_revenue"))Expected output:
Search 'revenue': [('dataset', 'daily_revenue', 'Aggregated daily revenue'), ('column', 'daily_revenue.total_revenue', 'Sum of daily orders')]
Lineage for daily_revenue: {'upstream': ['raw_orders.amount'], 'downstream': []}What’s Next
Built by the developers of Doda Browser, DodaZIP, and Durga Antivirus Pro.
Built by the developers of DodaTech
Doda Browser, DodaZIP & Durga Antivirus Pro