Skip to content
Data Catalog & Lineage — Atlas, DataHub, Amundsen & Column-Level Lineage

Data Catalog & Lineage — Atlas, DataHub, Amundsen & Column-Level Lineage

DodaTech Updated Jun 20, 2026 11 min read

A data catalog is a metadata management platform that indexes datasets, tracks lineage from source to consumption, enables discovery, and powers impact analysis — the nervous system of a modern data platform.

What You’ll Learn

This tutorial covers data catalog fundamentals: Apache Atlas for governance, DataHub for real-time metadata, Amundsen for data discovery, column-level lineage tracking, impact analysis workflows, and how catalogs integrate with data lakes, warehouses, and processing engines.

Why It Matters

Enterprises with thousands of datasets need a way to answer: Where did this data come from? Who depends on it? What happens if I change this column? Without a catalog, data teams waste 30-40% of their time finding and understanding data. DodaTech uses catalogs to manage telemetry data across Doda Browser and threat intelligence in Durga Antivirus Pro.

Real-World Use

LinkedIn developed DataHub to track metadata across thousands of datasets, pipelines, and ML models. Uber built DataHQ for similar needs. Financial institutions use Apache Atlas with Apache Ranger for compliance-driven data governance (GDPR, CCPA, SOX).

    flowchart TD
    subgraph Sources
        A[Databases] --> D[Data Catalog]
        B[Data Lakes] --> D
        C[Streams/Kafka] --> D
    end
    subgraph Catalog[Data Catalog]
        D --> E[Discovery]
        D --> F[Lineage]
        D --> G[Governance]
        D --> H[Glossary]
    end
    subgraph Consumers
        E --> I[Data Scientists]
        F --> J[Data Engineers]
        G --> K[Compliance]
        H --> L[Business Analysts]
    end
  

Why Data Catalogs Matter

Think of a data catalog as the card catalog of a library. Without it, finding the right book means wandering through shelves at random. With it, you find exactly what you need in seconds — plus you see which shelves connect to each other.

Key Functions

FunctionDescriptionStakeholder
DiscoveryFind datasets by name, tag, domainAll data users
LineageShow data flow: source → transform → reportEngineers, auditors
Impact AnalysisWhat breaks if I change this column?Engineers
GovernanceTags, classifications, access controlCompliance
GlossaryBusiness definitions for technical assetsAnalysts
ProfilingStatistics: row count, null %, distinct valuesAll

Apache Atlas — Governance-Focused Catalog

Apache Atlas is an open-source metadata and governance platform designed for Hadoop ecosystems, providing data classification, lineage, and policy-driven access control.

# Simulating Apache Atlas metadata operations
class AtlasClient:
    """Simulate interaction with Apache Atlas REST API."""

    def __init__(self, cluster_name="dodatech_cluster"):
        self.cluster = cluster_name
        self._entities = {}
        self._classifications = {}
        self._lineage = {}

    def create_entity(self, entity_type, qualified_name, attributes):
        """Register a metadata entity."""
        entity = {
            "typeName": entity_type,
            "attributes": {
                "qualifiedName": qualified_name,
                "name": attributes.get("name"),
                "description": attributes.get("description"),
                "owner": attributes.get("owner"),
                "createTime": "2026-06-20T10:00:00Z",
            },
        }
        self._entities[qualified_name] = entity
        print(f"[Atlas] Created {entity_type}: {qualified_name}")
        return qualified_name

    def add_classification(self, qualified_name, classification_name):
        """Tag an entity with a classification (PII, sensitive, etc.)."""
        if qualified_name not in self._classifications:
            self._classifications[qualified_name] = []
        self._classifications[qualified_name].append(classification_name)
        print(f"[Atlas] Tagged {qualified_name} -> {classification_name}")

    def add_lineage(self, source_qn, target_qn, process_name):
        """Record lineage: source -> process -> target."""
        self._lineage[(source_qn, target_qn)] = {
            "process": process_name,
            "timestamp": "2026-06-20T10:00:00Z",
        }
        print(f"[Atlas] Lineage: {source_qn} -> {process_name} -> {target_qn}")

    def get_lineage(self, qualified_name, direction="BOTH"):
        """Get upstream or downstream lineage."""
        print(f"\n[Atlas] Lineage for {qualified_name} ({direction}):")
        for (src, tgt), info in self._lineage.items():
            if direction in ("BOTH", "INPUT") and tgt == qualified_name:
                print(f"  Upstream: {src} -> {info['process']}")
            if direction in ("BOTH", "OUTPUT") and src == qualified_name:
                print(f"  Downstream: {tgt} via {info['process']}")

    def search(self, query):
        """Search entities by name or tag."""
        results = []
        for qn, entity in self._entities.items():
            if query.lower() in qn.lower() or query.lower() in str(entity).lower():
                results.append(qn)
        return results

# Simulate cataloging DodaTech's data platform
atlas = AtlasClient()

# Create entities
atlas.create_entity("hive_table", "raw_db.clicks",
                    {"name": "clicks", "description": "Raw clickstream events",
                     "owner": "data_eng"})
atlas.create_entity("hive_table", "analytics.daily_metrics",
                    {"name": "daily_metrics", "description": "Aggregated daily metrics",
                     "owner": "analytics"})
atlas.create_entity("process", "etl.clean_clicks",
                    {"name": "clean_clicks_job", "description": "Cleans raw clicks",
                     "owner": "data_eng"})

# Add classifications
atlas.add_classification("raw_db.clicks", "PII")       # Contains user IDs
atlas.add_classification("analytics.daily_metrics", "BI_Ready")

# Add lineage
atlas.add_lineage("raw_db.clicks", "analytics.daily_metrics", "etl.clean_clicks")

# Query
atlas.get_lineage("analytics.daily_metrics", "INPUT")
print(f"\nSearch 'clicks': {atlas.search('clicks')}")

Expected output:

[Atlas] Created hive_table: raw_db.clicks
[Atlas] Created hive_table: analytics.daily_metrics
[Atlas] Created process: etl.clean_clicks
[Atlas] Tagged raw_db.clicks -> PII
[Atlas] Tagged analytics.daily_metrics -> BI_Ready
[Atlas] Lineage: raw_db.clicks -> etl.clean_clicks -> analytics.daily_metrics

[Atlas] Lineage for analytics.daily_metrics (INPUT):
  Upstream: raw_db.clicks -> etl.clean_clicks

Search 'clicks': ['raw_db.clicks']

DataHub — Real-Time Metadata Platform

DataHub, originally built by LinkedIn, is a metadata platform that pulls metadata from various sources (dbt, Airflow, Kafka, Spark, Tables) and provides real-time search, lineage, and governance.

class DataHubClient:
    """Simulate DataHub metadata ingestion and search."""

    def __init__(self):
        self._datasets = {}
        self._pipelines = {}
        self._lineage_edges = []
        self._docs = {}

    def ingest_dataset(self, urn, name, platform, schema_fields):
        """Register a dataset with its schema."""
        self._datasets[urn] = {
            "name": name,
            "platform": platform,
            "schema": schema_fields,
            "tags": [],
            "ownership": None,
        }
        print(f"[DataHub] Ingested dataset: {name} ({platform})")

    def ingest_pipeline(self, urn, name):
        """Register a data pipeline."""
        self._pipelines[urn] = {"name": name, "runs": 0}

    def add_lineage(self, source_urn, target_urn):
        """Record dataset-level lineage."""
        self._lineage_edges.append((source_urn, target_urn))
        src_name = self._datasets.get(source_urn, {}).get("name", source_urn)
        tgt_name = self._datasets.get(target_urn, {}).get("name", target_urn)
        print(f"[DataHub] Lineage: {src_name} -> {tgt_name}")

    def add_column_lineage(self, dataset_urn, source_col, target_col):
        """Record column-level lineage."""
        col_key = f"{dataset_urn}.column_lineage"
        if col_key not in self._lineage_edges:
            self._lineage_edges.append(col_key)
        print(f"[DataHub] Column lineage: {dataset_urn}.{target_col} <- {source_col}")

    def search(self, query):
        """Search datasets by name or schema field."""
        results = []
        for urn, ds in self._datasets.items():
            if query.lower() in ds["name"].lower():
                results.append(ds["name"])
            for field in ds["schema"]:
                if query.lower() in field.get("name", "").lower():
                    results.append(f"{ds['name']}.{field['name']}")
        return results

    def impact_analysis(self, urn):
        """Find all downstream dependencies."""
        print(f"\n[DataHub] Impact analysis for {self._datasets.get(urn, {}).get('name', urn)}:")
        affected = []
        for src, tgt in self._lineage_edges:
            if src == urn:
                downstream = self._datasets.get(tgt, {}).get("name", tgt)
                affected.append(downstream)
                print(f"  ⚠ Downstream: {downstream}")
        return affected

# Simulate
dh = DataHubClient()
dh.ingest_dataset("urn:li:dataset:raw_clicks", "raw_clicks", "hive", 
                  [{"name": "user_id", "type": "string"},
                   {"name": "page_url", "type": "string"},
                   {"name": "click_time", "type": "timestamp"}])
dh.ingest_dataset("urn:li:dataset:clean_clicks", "clean_clicks", "hive",
                  [{"name": "user_id", "type": "string"},
                   {"name": "page_path", "type": "string"},
                   {"name": "event_time", "type": "timestamp"}])
dh.ingest_dataset("urn:li:dataset:daily_metrics", "daily_metrics", "hive",
                  [{"name": "date", "type": "date"},
                   {"name": "page_views", "type": "bigint"},
                   {"name": "unique_users", "type": "bigint"}])

dh.add_lineage("urn:li:dataset:raw_clicks", "urn:li:dataset:clean_clicks")
dh.add_lineage("urn:li:dataset:clean_clicks", "urn:li:dataset:daily_metrics")
dh.add_column_lineage("urn:li:dataset:clean_clicks", "user_id", "user_id")

print(f"\nSearch 'clicks': {dh.search('clicks')}")
dh.impact_analysis("urn:li:dataset:clean_clicks")

Expected output:

[DataHub] Ingested dataset: raw_clicks (hive)
[DataHub] Ingested dataset: clean_clicks (hive)
[DataHub] Ingested dataset: daily_metrics (hive)
[DataHub] Lineage: raw_clicks -> clean_clicks
[DataHub] Lineage: clean_clicks -> daily_metrics
[DataHub] Column lineage: urn:li:dataset:clean_clicks.user_id <- user_id

Search 'clicks': ['raw_clicks', 'clean_clicks']
[DataHub] Impact analysis for clean_clicks:
  ⚠ Downstream: daily_metrics

Amundsen — Data Discovery Focus

Amundsen, originally built by Lyft, focuses on data discovery with a Google-like search experience, table previews, and popularity signals.

class AmundsenClient:
    """Simulate Amundsen data discovery."""

    def __init__(self):
        self._tables = {}
        self._popularity = {}

    def register_table(self, name, database, cluster, schema, description, tags=None):
        """Register a table for discovery."""
        key = f"{database}://{cluster}.{schema}/{name}"
        self._tables[key] = {
            "name": name,
            "database": database,
            "cluster": cluster,
            "schema": schema,
            "description": description,
            "tags": tags or [],
            "owners": [],
            "columns": [],
        }
        print(f"[Amundsen] Registered: {key}")
        return key

    def add_column(self, table_key, col_name, col_type, description):
        self._tables[table_key]["columns"].append({
            "name": col_name,
            "type": col_type,
            "description": description,
        })

    def record_usage(self, table_key, user):
        """Record table usage (boosts search ranking)."""
        self._popularity[table_key] = self._popularity.get(table_key, 0) + 1

    def search(self, query):
        """Search with relevance scoring."""
        results = []
        for key, table in self._tables.items():
            score = 0
            if query.lower() in table["name"].lower():
                score += 10
            if query.lower() in table["description"].lower():
                score += 5
            for tag in table["tags"]:
                if query.lower() in tag.lower():
                    score += 3
            if score > 0:
                results.append((score + self._popularity.get(key, 0),
                                table["name"], key))
        results.sort(reverse=True)
        return [(name, key) for _, name, key in results]

amundsen = AmundsenClient()
clicks = amundsen.register_table("clicks", "hive", "prod", "web", 
                                 "Raw clickstream from Doda Browser",
                                 tags=["events", "web", "product"])
amundsen.add_column(clicks, "user_id", "string", "Anonymous user ID")
amundsen.add_column(clicks, "page_url", "string", "Full URL of page view")
amundsen.add_column(clicks, "click_time", "timestamp", "When the click occurred")

daily = amundsen.register_table("daily_metrics", "hive", "prod", "analytics",
                                "Aggregated daily page metrics",
                                tags=["aggregated", "kpi"])
amundsen.add_column(daily, "date", "date", "Calendar date")
amundsen.add_column(daily, "page_views", "bigint", "Total page views")

amundsen.record_usage(clicks, "alice")
amundsen.record_usage(clicks, "bob")
amundsen.record_usage(daily, "alice")

print("Search 'clicks':", [r[0] for r in amundsen.search("clicks")])
print("Search 'metrics':", [r[0] for r in amundsen.search("metrics")])

Expected output:

[Amundsen] Registered: hive://prod.web/clicks
[Amundsen] Registered: hive://prod.analytics/daily_metrics
Search 'clicks': ['clicks']
Search 'metrics': ['daily_metrics']

Column-Level Lineage

Column-level lineage tracks the flow of individual columns through transformations, showing exactly how each field in the output was derived from source columns.

class ColumnLineageTracker:
    """Track column transformations through a pipeline."""

    def __init__(self):
        self._edges = []  # (source_table, source_col, target_table, target_col, transform)

    def add_mapping(self, src_table, src_col, tgt_table, tgt_col, transform="direct"):
        """Record a column-level mapping."""
        self._edges.append({
            "source": f"{src_table}.{src_col}",
            "target": f"{tgt_table}.{tgt_col}",
            "transform": transform,
        })

    def upstream(self, table, column):
        """Find what a column depends on."""
        deps = []
        for e in self._edges:
            if e["target"] == f"{table}.{column}":
                deps.append((e["source"], e["transform"]))
        return deps

    def downstream(self, table, column):
        """Find what depends on this column."""
        deps = []
        for e in self._edges:
            if e["source"] == f"{table}.{column}":
                deps.append((e["target"], e["transform"]))
        return deps

    def impact(self, table, column):
        """Full impact analysis: what breaks if this column changes."""
        print(f"\nImpact of changing {table}.{column}:")
        downstream_deps = self.downstream(table, column)
        if not downstream_deps:
            print("  No direct downstream dependencies")
        for dep, transform in downstream_deps:
            print(f"  ⚠ Breaks: {dep} (via {transform})")
            # Recursive: check downstream of downstream
            tgt_table, tgt_col = dep.split(".")
            sub_deps = self.downstream(tgt_table, tgt_col)
            for sub_dep, sub_transform in sub_deps:
                print(f"    ⚠ Also: {sub_dep} (via {sub_transform})")

# Simulate column-level lineage for a data pipeline
tracker = ColumnLineageTracker()

# bronze -> silver
tracker.add_mapping("raw_clicks", "user_id", "clean_clicks", "user_id")
tracker.add_mapping("raw_clicks", "page_url", "clean_clicks", "page_path", 
                    "regex: parse path from URL")
tracker.add_mapping("raw_clicks", "click_time", "clean_clicks", "event_time",
                    "cast: to_timestamp")

# silver -> gold
tracker.add_mapping("clean_clicks", "user_id", "daily_metrics", "unique_users",
                    "count(distinct)")
tracker.add_mapping("clean_clicks", "page_path", "daily_metrics", "page_views",
                    "count(*) group by page_path")

# Query
print("Upstream of daily_metrics.unique_users:", 
      tracker.upstream("daily_metrics", "unique_users"))
tracker.impact("raw_clicks", "user_id")

Expected output:

Upstream of daily_metrics.unique_users: [('clean_clicks.user_id', 'count(distinct)')]

Impact of changing raw_clicks.user_id:
  ⚠ Breaks: clean_clicks.user_id (via direct)
    ⚠ Also: daily_metrics.unique_users (via count(distinct))

Impact Analysis

Impact analysis answers: “If I change this table or column, what downstream dashboards, pipelines, or reports will break?”

class ImpactAnalyzer:
    """Full impact analysis across datasets, dashboards, and pipelines."""

    def __init__(self):
        self._tables = {}
        self._dashboards = {}
        self._pipelines = {}

    def add_table(self, name):
        self._tables[name] = {"downstream_tables": [], "downstream_dashboards": []}

    def add_dashboard(self, name, chart_names):
        self._dashboards[name] = {"charts": chart_names}

    def add_pipeline(self, name, input_tables, output_tables):
        self._pipelines[name] = {"inputs": input_tables, "outputs": output_tables}
        for tbl in input_tables:
            if tbl in self._tables:
                self._tables[tbl]["downstream_tables"].extend(output_tables)

    def link_table_to_dashboard(self, table, dashboard):
        if table in self._tables:
            self._tables[table]["downstream_dashboards"].append(dashboard)

    def analyze(self, table_name):
        """Show what depends on this table."""
        print(f"\n=== Impact Analysis: {table_name} ===")
        info = self._tables.get(table_name, {})

        if info.get("downstream_tables"):
            print(f"\n📊 Downstream tables: {set(info['downstream_tables'])}")
        if info.get("downstream_dashboards"):
            print(f"📈 Dashboards affected: {set(info['downstream_dashboards'])}")

        for pipe_name, pipe in self._pipelines.items():
            if table_name in pipe["inputs"]:
                print(f"🔧 Pipeline breakage: {pipe_name}")

# Simulate an enterprise data platform
ia = ImpactAnalyzer()
ia.add_table("raw_clicks")
ia.add_table("clean_clicks")
ia.add_table("daily_metrics")
ia.add_table("user_profiles")

ia.add_pipeline("click_cleaner", ["raw_clicks"], ["clean_clicks"])
ia.add_pipeline("daily_agg", ["clean_clicks"], ["daily_metrics"])

ia.add_dashboard("Executive Dashboard", ["DAU Trends", "Page Views by Day"])
ia.add_dashboard("Product Report", ["Top Pages", "User Growth"])

ia.link_table_to_dashboard("daily_metrics", "Executive Dashboard")
ia.link_table_to_dashboard("daily_metrics", "Product Report")

ia.analyze("clean_clicks")

Expected output:

=== Impact Analysis: clean_clicks ===

📊 Downstream tables: {'daily_metrics'}
📈 Dashboards affected: {'Executive Dashboard', 'Product Report'}
🔧 Pipeline breakage: daily_agg

Common Mistakes

1. Catalog Without Automation

Manually registering tables guarantees stale metadata. Automate ingestion from Hive Metastore, AWS Glue, dbt, and Airflow.

2. No Column-Level Lineage

Table-level lineage tells you clickstream → dashboard. Column-level tells you clickstream.user_id → dashboard.unique_users. The latter finds real breakage.

3. Ignoring Data Profiling

A catalog without row counts, null percentages, and freshness metrics is just a list of table names — not useful for trust.

4. Not Tagging Sensitivity

Without PII/PHI/PCI classifications on columns, compliance audits require manual review of every dataset.

5. One Catalog to Rule All

Large organizations need a federated approach: Atlas for Hadoop governance, DataHub for streaming metadata, Amundsen for discovery. Bridge them via OpenMetadata or a custom sync.

Practice Questions

  1. What is a data catalog and why do enterprises need one? A data catalog indexes datasets, lineage, and metadata for discovery, governance, and impact analysis. Enterprises need it to avoid “data swamp” and comply with regulations.

  2. How does column-level lineage differ from table-level lineage? Column-level tracks individual field transformations (source column → target column), enabling precise impact analysis. Table-level shows only table dependencies.

  3. What is the primary focus of each: Apache Atlas vs DataHub vs Amundsen? Atlas: governance and classification. DataHub: real-time metadata with rich lineage. Amundsen: data discovery and search.

  4. What is impact analysis and when would you use it? Impact analysis determines what breaks if a dataset or column changes. Used before schema changes, pipeline modifications, or deletions.

  5. Challenge: Design a catalog strategy for a company with Hadoop, Snowflake, Kafka, and dbt. Which catalog(s) would you use and how would they integrate?

Mini Project: Lightweight Data Catalog

# data_catalog.py
# Build a simple in-memory data catalog with lineage and search

class SimpleDataCatalog:
    def __init__(self):
        self.datasets = {}
        self.columns = {}
        self.lineage_edges = []

    def add_dataset(self, name, description, owner):
        self.datasets[name] = {
            "description": description,
            "owner": owner,
            "columns": [],
            "tags": [],
        }

    def add_column(self, dataset, name, col_type, description, tags=None):
        col = {"name": name, "type": col_type, 
               "description": description, "tags": tags or []}
        self.datasets[dataset]["columns"].append(col)
        self.columns[f"{dataset}.{name}"] = col

    def add_lineage(self, source_dataset, source_col, target_dataset, target_col):
        self.lineage_edges.append({
            "source": f"{source_dataset}.{source_col}",
            "target": f"{target_dataset}.{target_col}",
        })

    def search(self, query):
        results = []
        for name, ds in self.datasets.items():
            if query.lower() in name.lower():
                results.append(("dataset", name, ds["description"]))
            for col in ds["columns"]:
                if query.lower() in col["name"].lower():
                    results.append(("column", f"{name}.{col['name']}", col["description"]))
        return results

    def lineage_for(self, dataset_name):
        result = {"upstream": [], "downstream": []}
        for edge in self.lineage_edges:
            src_ds, src_col = edge["source"].split(".")
            tgt_ds, tgt_col = edge["target"].split(".")
            if tgt_ds == dataset_name:
                result["upstream"].append(edge["source"])
            elif src_ds == dataset_name:
                result["downstream"].append(edge["target"])
        return result

# Demo
cat = SimpleDataCatalog()
cat.add_dataset("raw_orders", "Raw orders from e-commerce", "data_eng")
cat.add_column("raw_orders", "order_id", "int", "Primary key")
cat.add_column("raw_orders", "amount", "decimal", "Order total")
cat.add_dataset("daily_revenue", "Aggregated daily revenue", "analytics")
cat.add_column("daily_revenue", "revenue_date", "date", "Calendar date")
cat.add_column("daily_revenue", "total_revenue", "decimal", "Sum of daily orders")
cat.add_lineage("raw_orders", "amount", "daily_revenue", "total_revenue")

print("Search 'revenue':", cat.search("revenue"))
print("\nLineage for daily_revenue:", cat.lineage_for("daily_revenue"))

Expected output:

Search 'revenue': [('dataset', 'daily_revenue', 'Aggregated daily revenue'), ('column', 'daily_revenue.total_revenue', 'Sum of daily orders')]

Lineage for daily_revenue: {'upstream': ['raw_orders.amount'], 'downstream': []}

What’s Next

Built by the developers of Doda Browser, DodaZIP, and Durga Antivirus Pro.

Built by the developers of DodaTech

Doda Browser, DodaZIP & Durga Antivirus Pro