Skip to main content
Version: Next

Lineage

DataHub’s Python SDK allows you to programmatically define and retrieve lineage between metadata entities. With the DataHub Lineage SDK, you can:

  • Add table-level and column-level lineage across datasets, data jobs, dashboards, and charts
  • Automatically infer lineage from SQL queries
  • Read lineage (upstream or downstream) for a given entity or column
  • Filter lineage results using structured filters

Getting Started

To use DataHub SDK, you'll need to install acryl-datahub and set up a connection to your DataHub instance. Follow the installation guide to get started.

Connect to your DataHub instance:

from datahub.sdk import DataHubClient

client = DataHubClient(server="<your_server>", token="<your_token>")
  • server: The URL of your DataHub GMS server
    • local: http://localhost:8080
    • hosted: https://<your_datahub_url>/gms
  • token: You'll need to generate a Personal Access Token from your DataHub instance.

Add Lineage

The add_lineage() method allows you to define lineage between two entities.

Add Entity Lineage

You can create lineage between two datasets, data jobs, dashboards, or charts. The upstream and downstream parameters should be the URNs of the entities you want to link.

Add Entity Lineage Between Datasets

# Inlined from /metadata-ingestion/examples/library/add_lineage_dataset_to_dataset.py
from datahub.metadata.urns import DatasetUrn
from datahub.sdk.main_client import DataHubClient

client = DataHubClient.from_env()


upstream_urn = DatasetUrn(platform="snowflake", name="sales_raw")
downstream_urn = DatasetUrn(platform="snowflake", name="sales_cleaned")
client.lineage.add_lineage(upstream=upstream_urn, downstream=downstream_urn)

Add Entity Lineage Between Datajobs

# Inlined from /metadata-ingestion/examples/library/lineage_datajob_to_datajob.py
from datahub.metadata.urns import DataFlowUrn, DataJobUrn
from datahub.sdk import DataHubClient

client = DataHubClient.from_env()

dataflow_urn = DataFlowUrn(
orchestrator="airflow", flow_id="data_pipeline", cluster="PROD"
)

client.lineage.add_lineage(
upstream=DataJobUrn(flow=dataflow_urn, job_id="data_job_1"),
downstream=DataJobUrn(flow=dataflow_urn, job_id="data_job_2"),
)

Lineage Combinations

For supported lineage combinations, see Supported Lineage Combinations.

Add Column Lineage

You can add column-level lineage by using column_lineage parameter when linking datasets.

Add Column Lineage with Fuzzy Matching

# Inlined from /metadata-ingestion/examples/library/lineage_dataset_column.py
from datahub.metadata.urns import DatasetUrn
from datahub.sdk import DataHubClient

client = DataHubClient.from_env()

client.lineage.add_lineage(
upstream=DatasetUrn(platform="snowflake", name="sales_raw"),
downstream=DatasetUrn(platform="snowflake", name="sales_cleaned"),
column_lineage=True, # same as "auto_fuzzy", which maps columns based on name similarity
)

When column_lineage is set to True, DataHub will automatically map columns based on their names, allowing for fuzzy matching. This is useful when upstream and downstream datasets have similar but not identical column names. (e.g. customer_id in upstream and CustomerId in downstream). See Column Lineage Options for more details.

Add Column Lineage with Strict Matching

# Inlined from /metadata-ingestion/examples/library/lineage_dataset_column_auto_strict.py
from datahub.metadata.urns import DatasetUrn
from datahub.sdk import DataHubClient

client = DataHubClient.from_env()

client.lineage.add_lineage(
upstream=DatasetUrn(platform="snowflake", name="sales_raw"),
downstream=DatasetUrn(platform="snowflake", name="sales_cleaned"),
column_lineage="auto_strict",
)

This will create column-level lineage with strict matching, meaning the column names must match exactly between upstream and downstream datasets.

Add Column Lineage with Custom Mapping

For custom mapping, you can use a dictionary where keys are downstream column names and values represent lists of upstream column names. This allows you to specify complex relationships.

# Inlined from /metadata-ingestion/examples/library/lineage_dataset_column_custom_mapping.py
from datahub.metadata.urns import DatasetUrn
from datahub.sdk import DataHubClient

client = DataHubClient.from_env()

client.lineage.add_lineage(
upstream=DatasetUrn(platform="snowflake", name="sales_raw"),
downstream=DatasetUrn(platform="snowflake", name="sales_cleaned"),
# { downstream_column -> [upstream_columns] }
column_lineage={
"id": ["id"],
"region": ["region", "region_id"],
"total_revenue": ["revenue"],
},
)

Infer Lineage from SQL

You can infer lineage directly from a SQL query using infer_lineage_from_sql(). This will parse the query, determine upstream and downstream datasets, and automatically add lineage (including column-level lineage when possible) and a query node showing the SQL transformation logic.

# Inlined from /metadata-ingestion/examples/library/lineage_dataset_from_sql.py
from datahub.sdk.main_client import DataHubClient

client = DataHubClient.from_env()

sql_query = """
CREATE TABLE sales_summary AS
SELECT
p.product_name,
c.customer_segment,
SUM(s.quantity) as total_quantity,
SUM(s.amount) as total_sales
FROM sales s
JOIN products p ON s.product_id = p.id
JOIN customers c ON s.customer_id = c.id
GROUP BY p.product_name, c.customer_segment
"""

# sales_summary will be assumed to be in the default db/schema
# e.g. prod_db.public.sales_summary
client.lineage.infer_lineage_from_sql(
query_text=sql_query,
platform="snowflake",
default_db="prod_db",
default_schema="public",
)

DataHub SQL Parser

Check out more information on how we handle SQL parsing below.

Add Query Node with Lineage

If you provide a transformation_text to add_lineage, DataHub will create a query node that represents the transformation logic. This is useful for tracking how data is transformed between datasets.

# Inlined from /metadata-ingestion/examples/library/add_lineage_dataset_to_dataset_with_query_node.py
from datahub.metadata.urns import DatasetUrn
from datahub.sdk.main_client import DataHubClient

client = DataHubClient.from_env()

upstream_urn = DatasetUrn(platform="snowflake", name="upstream_table")
downstream_urn = DatasetUrn(platform="snowflake", name="downstream_table")

transformation_text = """
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("HighValueFilter").getOrCreate()
df = spark.read.table("customers")
high_value = df.filter("lifetime_value > 10000")
high_value.write.saveAsTable("high_value_customers")
"""

client.lineage.add_lineage(
upstream=upstream_urn,
downstream=downstream_urn,
transformation_text=transformation_text,
column_lineage={"id": ["id", "customer_id"]},
)

# by passing the transformation_text, the query node will be created with the table level lineage.
# transformation_text can be any transformation logic e.g. a spark job, an airflow DAG, python script, etc.
# if you have a SQL query, we recommend using add_dataset_lineage_from_sql instead.
# note that transformation_text itself will not create a column level lineage.

Transformation text can be any transformation logic, Python scripts, Airflow DAG code, or any other code that describes how the upstream dataset is transformed into the downstream dataset.

note

Providing transformation_text will NOT create column lineage. You need to specify column_lineage parameter to enable column-level lineage.

If you have a SQL query that describes the transformation, you can use infer_lineage_from_sql to automatically parse the query and add column level lineage.

Get Lineage

The get_lineage() method allows you to retrieve lineage for a given entity.

Get Entity Lineage

Get Upstream Lineage for a Dataset

This will return the direct upstream entity that the dataset depends on. By default, it retrieves only the immediate upstream entities (1 hop).

# Inlined from /metadata-ingestion/examples/library/get_lineage_basic.py
from datahub.metadata.urns import DatasetUrn
from datahub.sdk.main_client import DataHubClient

client = DataHubClient.from_env()

downstream_lineage = client.lineage.get_lineage(
source_urn=DatasetUrn(platform="snowflake", name="sales_summary"),
direction="downstream",
)

print(downstream_lineage)

Get Downstream Lineage for a Dataset Across Multiple Hops

To get upstream/downstream entities that are more than one hop away, you can use the max_hops parameter. This allows you to traverse the lineage graph up to a specified number of hops.

# Inlined from /metadata-ingestion/examples/library/get_lineage_with_hops.py
from datahub.metadata.urns import DatasetUrn
from datahub.sdk.main_client import DataHubClient

client = DataHubClient.from_env()

downstream_lineage = client.lineage.get_lineage(
source_urn=DatasetUrn(platform="snowflake", name="sales_summary"),
direction="downstream",
max_hops=2,
)

print(downstream_lineage)


USING MAX_HOPS

if you provide max_hops greater than 2, it will traverse the full lineage graph and limit the results by count.

Return Type

get_lineage() returns a list of LineageResult objects.

results = [
LineageResult(
urn="urn:li:dataset:(urn:li:dataPlatform:snowflake,table_2,PROD)",
type="DATASET",
hops=1,
direction="downstream",
platform="snowflake",
name="table_2", # name of the entity
paths=[] # Only populated for column-level lineage
)
]

Get Column-Level Lineage

Get Downstream Lineage for a Dataset Column

You can retrieve column-level lineage by specifying the source_column parameter. This will return lineage paths that include the specified column.

# Inlined from /metadata-ingestion/examples/library/get_column_lineage.py
from datahub.metadata.urns import DatasetUrn
from datahub.sdk.main_client import DataHubClient

client = DataHubClient.from_env()

# Get column lineage for the entire flow
# you can pass source_urn and source_column to get lineage for a specific column
# alternatively, you can pass schemaFieldUrn to source_urn.
# e.g. source_urn="urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,downstream_table),id)"
downstream_column_lineage = client.lineage.get_lineage(
source_urn=DatasetUrn(platform="snowflake", name="sales_summary"),
source_column="id",
direction="downstream",
)

print(downstream_column_lineage)

You can also pass SchemaFieldUrn as the source_urn to get column-level lineage.

# Inlined from /metadata-ingestion/examples/library/get_column_lineage_from_schemafield.py
from datahub.sdk.main_client import DataHubClient

client = DataHubClient.from_env()

# Get column lineage for the entire flow
results = client.lineage.get_lineage(
source_urn="urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,sales_summary,PROD),id)",
direction="downstream",
)

print(list(results))


Return type

The return type is the same as for entity lineage, but with additional paths field that contains column lineage paths.

results = [
LineageResult(
urn="urn:li:dataset:(urn:li:dataPlatform:snowflake,table_2,PROD)",
type="DATASET",
hops=1,
direction="downstream",
platform="snowflake",
name="table_2", # name of the entity
paths=[
LineagePath(
urn="urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,table_1,PROD),col1)",
column_name="col1", # name of the column
entity_name="table_1", # name of the entity that contains the column
),
LineagePath(
urn="urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,table_2,PROD),col4)",
column_name="col4", # name of the column
entity_name="table_2", # name of the entity that contains the column
)
] # Only populated for column-level lineage
)
]

For more details on how to interpret the results, see Interpreting Lineage Results.

Filter Lineage Results

You can filter by platform, type, domain, environment, and more.

# Inlined from /metadata-ingestion/examples/library/get_lineage_with_filter.py
from datahub.sdk.main_client import DataHubClient
from datahub.sdk.search_filters import FilterDsl as F

client = DataHubClient.from_env()

# get upstream snowflake production datasets.
results = client.lineage.get_lineage(
source_urn="urn:li:dataset:(platform,sales_agg,PROD)",
direction="upstream",
filter=F.and_(F.platform("snowflake"), F.entity_type("dataset"), F.env("PROD")),
)

print(results)

You can check more details about the available filters in the Search SDK documentation.

Lineage SDK Reference

For a full reference, see the lineage SDK reference.

Supported Lineage Combinations

The Lineage APIs support the following entity combinations:

Upstream EntityDownstream Entity
DatasetDataset
DatasetDataJob
DataJobDataJob
DatasetDashboard
ChartDashboard
DashboardDashboard
DatasetChart

ℹ️ Column-level lineage and creating query node with transformation text are only supported for Dataset → Dataset lineage.

Column Lineage Options

For dataset-to-dataset lineage, you can specify column_lineage parameter in add_lineage() in several ways:

ValueDescription
FalseDisable column-level lineage (default)
TrueEnable column-level lineage with automatic mapping (same as "auto_fuzzy")
"auto_fuzzy"Enable column-level lineage with fuzzy matching (useful for similar column names)
"auto_strict"Enable column-level lineage with strict matching (exact column names required)
Column MappingA dictionary mapping downstream column names to lists of upstream column names
auto_fuzzy vs auto_strict
  • auto_fuzzy: Automatically matches columns based on similar names, allowing for some flexibility in naming conventions. For example, these two columns would be considered a match:
    • user_id → userId
    • customer_id → CustomerId
  • auto_strict: Requires exact column name matches between upstream and downstream datasets. For example, customer_id in the upstream dataset must match customer_id in the downstream dataset exactly.

Interpreting Column Lineage Results

When retrieving column-level lineage, the results include paths that show how columns are related across datasets. Each path is a list of column URNs that represent the lineage from the source column to the target column.

For example, let's say we have the following lineage across three tables:

Example with max_hops=1

>>> client.lineage.get_lineage(
source_urn="urn:li:dataset:(urn:li:dataPlatform:snowflake,table_1,PROD)",
source_column="col1",
direction="downstream",
max_hops=1
)

Returns:

[
{
"urn": "...table_2...",
"hops": 1,
"paths": [
["...table_1.col1", "...table_2.col4"],
["...table_1.col1", "...table_2.col5"]
]
}
]

Example with max_hops=2

>>> client.lineage.get_lineage(
source_urn="urn:li:dataset:(urn:li:dataPlatform:snowflake,table_1,PROD)",
source_column="col1",
direction="downstream",
max_hops=2
)

Returns:

[
{
"urn": "...table_2...",
"hops": 1,
"paths": [
["...table_1.col1", "...table_2.col4"],
["...table_1.col1", "...table_2.col5"]
]
},
{
"urn": "...table_3...",
"hops": 2,
"paths": [
["...table_1.col1", "...table_2.col4", "...table_3.col7"]
]
}
]

Alternative: Lineage GraphQL API

While we generally recommend using the Python SDK for lineage, you can also use the GraphQL API to add and retrieve lineage.

Add Lineage Between Datasets with GraphQL

mutation updateLineage {
updateLineage(
input: {
edgesToAdd: [
{
downstreamUrn: "urn:li:dataset:(urn:li:dataPlatform:hive,logging_events,PROD)"
upstreamUrn: "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_deleted,PROD)"
}
]
edgesToRemove: []
}
)
}

Get Downstream Lineage with GraphQL

query scrollAcrossLineage {
scrollAcrossLineage(
input: {
query: "*"
urn: "urn:li:dataset:(urn:li:dataPlatform:hive,logging_events,PROD)"
count: 10
direction: DOWNSTREAM
orFilters: [
{
and: [
{
condition: EQUAL
negated: false
field: "degree"
values: ["1", "2", "3+"]
}
]
}
]
}
) {
searchResults {
degree
entity {
urn
type
}
}
}
}

FAQ

Can I get lineage at the column level? Yes — for dataset-to-dataset lineage, both add_lineage() and get_lineage() support column-level lineage.

Can I pass a SQL query and get lineage automatically? Yes — use infer_lineage_from_sql() to parse a query and extract table and column lineage.

Can I use filters when retrieving lineage? Yes — get_lineage() accepts structured filters via FilterDsl, just like in the Search SDK.