OpenLineage integration for dlt pipelines. Automatically emits lineage events to Marquez or any OpenLineage-compatible backend.
- Table-level lineage: Track which resources a pipeline extracts and which destination tables it writes (no column-level lineage, see reshaping notes)
- Schema capture: Column names and dlt data types for each destination table
- Row counts: Per-table row counts from the normalize step
- Destination-aware namespaces: Output datasets namespaced by destination type and fingerprint
- Per-step events: START, RUNNING (extract), COMPLETE (load), FAIL events
- Processing engine metadata: dlt version and adapter version in every event
pip install dlt-openlineageOr with uv:
uv add dlt-openlineageAdd two lines before your pipeline runs:
import dlt
import dlt_openlineage
dlt_openlineage.install(url="http://localhost:5000")
@dlt.resource
def users():
yield [
{"id": 1, "name": "Alice", "email": "alice@example.com"},
{"id": 2, "name": "Bob", "email": "bob@example.com"},
]
pipeline = dlt.pipeline(
pipeline_name="my_pipeline",
destination="duckdb",
dataset_name="raw_data",
)
pipeline.run(users())That's it. OpenLineage events are emitted automatically during pipeline execution.
You can also configure via environment variables:
export OPENLINEAGE_URL=http://localhost:5000
export OPENLINEAGE_NAMESPACE=my_project
export OPENLINEAGE_API_KEY=... # optional, for authenticated endpointsThen:
import dlt_openlineage
dlt_openlineage.install() # reads from env varsThis package implements dlt's SupportsTracking protocol and registers via dlt.pipeline.trace.TRACKING_MODULES. The tracker intercepts pipeline lifecycle events and emits corresponding OpenLineage events:
| dlt Step | OpenLineage Event | Data Included |
|---|---|---|
| Pipeline start | RunEvent(START) | Job type, processing engine |
| Extract complete | RunEvent(RUNNING) | Input datasets (extracted tables) |
| Load complete | RunEvent(COMPLETE) | Output datasets with schema, row counts, input datasets |
| Any step failure | RunEvent(FAIL) | Error message and stack trace |
| Pipeline end (no load) | RunEvent(COMPLETE) | Fallback terminal event from pipeline schema |
Input datasets are derived from dlt's ExtractInfo.metrics, which tracks which tables/resources were extracted. These are the table names as dlt sees them after extraction (i.e., the resource names or table names from table_metrics). Internal dlt tables (_dlt_loads, _dlt_version, etc.) are filtered out.
Input datasets appear in both the RUNNING event (after extract) and the COMPLETE event (after load), so lineage consumers can see the full input-to-output picture on the terminal event.
Inputs are namespaced with the configured namespace (defaults to "dlt"), not the destination, since they represent the logical source data before loading.
Output datasets are built from LoadInfo.load_packages, which lists every completed load job with its target table name. Each output dataset includes:
- Namespace: derived from the destination type and fingerprint, e.g.
duckdb://local,postgres://myhost.com,bigquery://project-id - Name: qualified as
{dataset_name}.{table_name}, e.g.raw_data.users,raw_data.orders - Schema facet: column names and dlt data types (e.g.
bigint,text,decimal) from the pipeline's default schema - Row counts: per-table row counts from the normalize step's
NormalizeInfo.row_counts
If LoadInfo isn't available (e.g. extract-only runs), output datasets fall back to the pipeline's default_schema.tables.
dlt aggressively reshapes data between extract and load. This affects what shows up in lineage:
- Nested data is flattened: dlt unnests JSON objects and arrays into separate tables. A resource
userswith a nestedaddressesarray becomes two destination tables:usersandusers__addresses. Both appear as output datasets. The input side just showsusers(the original resource name). - Column names are normalized: dlt converts column names to snake_case and applies naming conventions. The schema facet reflects the normalized names as they exist in the destination, not the original source field names.
- Column types are dlt types: schema facets use dlt's internal type system (
bigint,text,double,complex,date,timestamp,wei, etc.), not the destination's native SQL types. - No column-level lineage: because dlt's reshaping (flattening, renaming, type coercion) isn't tracked as a transformation DAG, we emit table-level lineage only. There is no
ColumnLineageDatasetFacet. This is an honest representation: we can tell you thatusersresource produced theraw_data.usersandraw_data.users__addressestables, but we can't trace individual columns through dlt's normalizer. - Internal tables are excluded: dlt creates
_dlt_loads,_dlt_pipeline_state,_dlt_version, and similar bookkeeping tables. These are filtered from both input and output datasets.
Every event includes:
jobType:{processingType: "BATCH", integration: "DLT", jobType: "PIPELINE"}processing_engine: dlt version and adapter versiondlt_execution(custom facet, on COMPLETE): current step, destination type/name, dataset name, total/failed job countserrorMessage(on FAIL): error message, programming language, stack trace (the string representation dlt provides)
# Start Marquez locally
docker run -p 5000:5000 -p 5001:5001 -p 3000:3000 marquezproject/marquez
# Point your pipeline at it
export OPENLINEAGE_URL=http://localhost:5000
# Run your dlt pipeline
python my_pipeline.py
# View lineage at http://localhost:3000# Install dependencies
uv sync --dev
# Run tests (unit + integration with real dlt+DuckDB)
uv run pytest tests/ -v
# Lint
uv run ruff check src/MIT