In the modern data stack, "Where did this data come from?" is the single most expensive question you can ask.
If you are a Data Engineer, you have lived this nightmare: A dashboard breaks. The CEO is asking why the revenue numbers are wrong. You spend the next 4 hours tracing a CSV export back to a Spark job, which reads from a View, which joins three tables, one of which hasn't updated in 48 hours.
This is the "Data Lineage" problem.
Traditional documentation fails because data relationships are not linear, they are a network. To solve this, we need to treat our infrastructure as a Graph.
In this engineering guide, based on research from Fujitsu's Infrastructure Guild, we will move beyond static diagrams. We will architect a Graph-Based Lineage Engine using Neo4j and Python to programmatically trace dependencies, find root causes, and clean up dead data.
The Architecture: Modeling Your Platform
Most data platforms are treated as isolated silos (S3 buckets, SQL tables, Airflow DAGs). We need to connect them.
The Concept:
- Nodes: The Assets (Tables, Files, Jobs, Users).
- Edges: The Actions (READS, WRITES, OWNS, TRIGGERS).
Here is the schema we will implement programmatically:
The Tech Stack
- Database: Neo4j (Community Edition or AuraDB)
- Driver: neo4j Python Driver
- Logic: Python 3.9+
Phase 1: The Connection Logic
First, let's create a reusable Python client to interact with our Graph. We aren't just writing queries; we are building an API for our data platform.
from neo4j import GraphDatabase
class LineageGraph:
def __init__(self, uri, user, password):
self.driver = GraphDatabase.driver(uri, auth=(user, password))
def close(self):
self.driver.close()
def run_query(self, query, parameters=None):
with self.driver.session() as session:
result = session.run(query, parameters)
return [record.data() for record in result]
# Initialize connection
graph_db = LineageGraph("bolt://localhost:7687", "neo4j", "your_password")
Phase 2: Impact Analysis (The "Blast Radius")
This is where the graph shines. If the Raw_Leads table is corrupted, what downstream dashboards are broken?
In a standard SQL database, this requires complex recursive joins. In Python + Cypher, it is a simple traversal.
The Code: We define a function that takes a table name and walks the graph forward (-[:TRANSFORMS_TO*]->) to find every dependent asset.
def get_downstream_impact(graph_client, table_name):
"""
Finds all assets (Views, Files, Dashboards) that depend on a specific table.
"""
cypher_query = """
MATCH (source:Table {name: $name})-[:TRANSFORMS_TO|READS_FROM*]->(downstream)
RETURN DISTINCT downstream.name as asset_name, labels(downstream) as asset_type
"""
results = graph_client.run_query(cypher_query, parameters={"name": table_name})
print(f" Blast Radius for '{table_name}':")
for record in results:
print(f" → [{record['asset_type'][0]}] {record['asset_name']}")
# Usage
# get_downstream_impact(graph_db, "Raw_Leads")
Output:
Blast Radius for 'Raw_Leads':
→ [Table] Clean_Leads
→ [View] Regional_Sales_View
→ [Dashboard] Q3_Revenue_Report
Phase 3: Root Cause Analysis (The "Time Machine")
When a report is wrong, you need to trace it backwards to the source. Who changed the code? Which ETL job touched it last?
The Code: We walk the graph in reverse (<-[...]) to find the upstream lineage and the owner responsible.
def trace_root_cause(graph_client, artifact_name):
"""
Traces backwards from a broken report to find the source tables and owners.
"""
cypher_query = """
MATCH (destination {name: $name})<-[:WRITES_TO|TRANSFORMS_TO*]-(upstream)
OPTIONAL MATCH (upstream)<-[:OWNS]-(owner:User)
RETURN upstream.name as source, upstream.type as type, owner.name as owner
"""
results = graph_client.run_query(cypher_query, parameters={"name": artifact_name})
print(f" Root Cause Trace for '{artifact_name}':")
for record in results:
owner = record['owner'] if record['owner'] else "Unknown"
print(f" ← Modified by [{record['type']}] {record['source']} (Owner: {owner})")
# Usage
# trace_root_cause(graph_db, "Q3_Revenue_Report")
Scenario: This script might reveal that Q3_Revenue_Report reads from Clean_Leads, which was updated by Job_101, owned by Alice. You now know exactly who to Slack.
Phase 4: Automated Cleanup (Data Value)
Organizations struggle to delete old data because they don't know who uses it. We can programmatically calculate the "Centrality" or "Popularity" of a table.
If a table has Zero incoming READS_FROM edges in the graph, it is an "Orphan."
The Code:
def find_unused_assets(graph_client):
"""
Identifies tables that have no downstream dependencies (Orphans).
"""
cypher_query = """
MATCH (t:Table)
WHERE NOT (t)-[:TRANSFORMS_TO]->() AND NOT ()-[:READS_FROM]->(t)
RETURN t.name as table_name, t.created_at as created_date
"""
results = graph_client.run_query(cypher_query)
print("🗑️ Candidate Tables for Deletion (No Dependencies):")
for record in results:
print(f" - {record['table_name']} (Created: {record['created_date']})")
# Usage
# find_unused_assets(graph_db)
Developer Insight: You can hook this function into a Slack bot that runs every Monday: "Here are 5 tables that haven't been queried in 6 months. Delete them?"
Conclusion: Visualizing the Invisible
By wrapping Cypher queries in Python, we move from "Manual Documentation" to "Programmable Lineage."
The ROI of this code:
- Instant Debugging: Replace hours of log-diving with a single function call (trace_root_cause).
- Safety: Never break a dashboard again because you checked get_downstream_impact before dropping a column.
- Cost Savings: Automatically identify and delete unused storage (find_unused_assets).
Your Next Step: Export your information_schema and query logs, verify them with the Python scripts above, and finally see what your data platform actually looks like.