Generating Data Lineage with Database Tables DDL (DAG Gen)
In legacy systems, viewing data lineage and monitoring data movement can be a significant challenge. To address this pain point, I’ve developed a Proof of Concept (POC) aimed at resolving these issues.
With this POC, you can easily scan the database, configure dependencies, and automatically generate data lineage.
This project provides a solution for generating and visualizing data lineage based on table DDLs and ETL job metadata. The solution includes a Python script for generating data lineage information and a React application for visualizing the data lineage as a Directed Acyclic Graph (DAG).
How to create the data lineage ?
- Method 1: Lineage by parsing, It automatically reads the logic used to process data.
- Method 2: Lineage by data tagging, Transformation engine tags data that transforms or moves.
- Method 3: Pattern-based lineage, It uses patterns to perform lineage instead of dealing with the code that transforms the data. It relies on metadata to create a lineage by looking for patterns.
Lineage Generation
Workflow: Read DDL → Config DAG and Dependencies → Update DDL → Generate DAG Diagram
Step 1: Read DDL
Extract DDL Information
-
Setup a Database Connection: Establish a connection to the database where the DDL is stored. Use libraries like
psycopg2
for PostgreSQL,pyodbc
for SQL Server,cx_Oracle
for Oracle, ormysql-connector-python
for MySQL. -
Fetch DDL Statements: Retrieve the DDL statements from the database. This can be done by querying the database system tables or using database-specific commands.
import psycopg2
def fetch_ddl(connection_string):
conn = psycopg2.connect(connection_string)
cursor = conn.cursor()
cursor.execute("SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'")
tables = cursor.fetchall()
ddl_statements = {}
for table in tables:
cursor.execute(f"SELECT pg_get_tabledef({table[0]})")
ddl_statements[table[0]] = cursor.fetchone()[0]
cursor.close()
conn.close()
return ddl_statements
Step 2: Configure DAG and Dependencies
In order to monitor Data Runtime and Job Runtime on the same time, start
- Define Metadata: Create metadata to describe the tables, columns, and relationships.
- Define ETL Jobs: Identify and define the ETL jobs that affect the tables.
lineage:
table1:
step: 1
elt_job: job1
table2:
step: 2
elt_job: job2
table3:
step: 2
elt_job: job3
edges:
- from: table1
to: table2
- from: table2
to: table3
Step 3: Update the DDL of Tables
Modify DDL Statements
- Modify Columns or Add Constraints: Update the DDL statements if necessary (e.g., adding constraints, indices).
def update_ddl(ddl_statements, updates):
for table, update in updates.items():
if table in ddl_statements:
ddl_statements[table] += '\n' + update
return ddl_statements
Step 4: Generate the Lineage with Chart
Generate Data Lineage
- Parse DDL Statements: Extract table relationships and dependencies from the DDL statements.
- Create Lineage Graph: Use libraries like
graphviz
ornetworkx
to generate the lineage graph.
def export_dag_json(metadata, filename='dag.json'):
dag_data = {"nodes": [], "edges": []}
for table, details in metadata['lineage'].items():
step = details.get('step', 'unknown')
etl_job = details.get('elt_job', 'unknown')
dag_data["nodes"].append({
"id": table,
"label": f"{table}\nStep: {step}\nJob: {etl_job}",
"data": {
"step": step,
"etl_job": etl_job
}
})
if 'edges' in metadata:
for edge in metadata['edges']:
if edge['from'] == table:
dag_data["edges"].append({
"id": f"{edge['from']}-{edge['to']}",
"source": edge['from'],
"target": edge['to']
})
Detailed Solution Breakdown
- Database Connection and DDL Extraction: Create a Python script to connect to the database and extract the DDL statements.
- DAG Configuration: Define a YAML or JSON file to store metadata about tables, columns, and ETL jobs.
- DDL Updates: Write a function to update the DDL statements based on new requirements.
- Data Lineage Generation: Create a function to parse the metadata and generate a lineage graph using
graphviz
.
Project Setup
Directory Structure:
.
├── README.md
├── dag-generator
├── dag-visualizer
├── docker-compose.yml
├── docs
└── requirements.txt
Lineage Consumption
The lineage are generated with json
format with nodes and edges where node is dataset and edges is the link of connection.
Using import ForceGraph2D from 'react-force-graph-2d';
for generate the lineage of data movement.
How the nodes and edges are being visualized:
<ForceGraph2D
graphData={graphData}
width={800}
height={600}
nodeLabel={node => node.label}
nodeCanvasObjectMode={() => 'before'}
nodeCanvasObject={(node, ctx, globalScale) => {
const label = node.id;
const fontSize = 12 / globalScale;
ctx.font = `${fontSize}px Sans-Serif`;
ctx.textAlign = 'center';
ctx.textBaseline = 'middle';
ctx.fillStyle = 'black'; // node label color
ctx.fillText(label, node.x, node.y + fontSize);
if (rootNodes.includes(node.id)) {
ctx.beginPath();
ctx.arc(node.x, node.y, 10 / globalScale, 0, 2 * Math.PI, false);
ctx.fillStyle = 'orange';
ctx.fill();
}
}}
linkDirectionalArrowLength={6}
linkDirectionalArrowRelPos={1}
onNodeClick={handleNodeClick}
linkColor={link => (highlightedEdges.includes(link.id) ? 'red' : 'gray')}
/>
Setup
Prerequisites:
- Python 3.x
- Node.js and npm
-
PostgreSQL database (Or any others)
- Step#1 Clone project: checkout the project with: dag-gen, dag-viz
git clone https://github.com/lognbuivan/data-lineage-visualization.git
cd data-lineage-visualization
- Step#2 Install dependencies: running all libraries
pip install -r requirements.txt
- Step#3 Configure database connection and metadata: config in
metadata/metadata.yaml
.
Change the connection to your database
connection_string = "dbname='postgres' user='admin' password='admin' host='localhost' port='5432'"
Change the dependencies and lineage configuration
lineage:
table1:
step: 1
elt_job: job1
table2:
step: 2
elt_job: job2
table3:
step: 2
elt_job: job3
edges:
- from: table1
to: table2
- from: table2
to: table3
- Step#4 Run the scripts: start the the dag-gen
python src/main.py
- Step#5 Visualize lineage: start the dag-viz
cd dag-visualizer
and run React App
npm install
npm start
Usage
Backend:
The main.py script connects to your PostgreSQL database, extracts the table schemas, and generates a JSON file (dag.json) representing the data lineage.
Frontend:
The React application (dag-visualizer) loads the dag.json file and visualizes the data lineage as a DAG. Click on any node to highlight the edges connected to that node. Root nodes (master jobs) are highlighted with a different color.
Example Data
An example dag.json file structure:
{
"nodes": [
{
"id": "table1",
"label": "table1\nStep: 1\nJob: job1",
"data": {
"step": 1,
"etl_job": "job1"
}
},
...
],
"edges": [
{
"id": "table1-table11",
"source": "table1",
"target": "table11"
},
...
]
}
Conclusion
This solution provides a detailed outline for setting up a project to generate data lineage based on table DDL, including the extraction of DDL, configuration of DAGs, updating DDL, and generating the lineage chart.
Contribution
TBU…