Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from airflow.models.serialized_dag import SerializedDagModel
from airflow.serialization.serialized_objects import SerializedDAG
from pydantic import BaseModel, ValidationError
from sqlalchemy import join
from sqlalchemy import and_, func, join
from sqlalchemy.orm import Session

from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
Expand Down Expand Up @@ -113,6 +113,7 @@ class OMTaskInstance(BaseModel):
end_date: Optional[datetime]


# pylint: disable=too-many-locals,too-many-nested-blocks,too-many-boolean-expressions
class AirflowSource(PipelineServiceSource):
"""
Implements the necessary methods ot extract
Expand Down Expand Up @@ -387,24 +388,56 @@ def get_pipelines_list(self) -> Iterable[AirflowDagDetails]:
else SerializedDagModel.data # For 2.2.5 and 2.1.4
)

# Get the timestamp column for ordering (use last_updated if available, otherwise created_at)
timestamp_column = (
SerializedDagModel.last_updated
if hasattr(SerializedDagModel, "last_updated")
else SerializedDagModel.created_at
)

# Create subquery to get the latest timestamp for each DAG
# This handles cases where multiple versions exist in serialized_dag table
latest_dag_subquery = (
self.session.query(
SerializedDagModel.dag_id,
func.max(timestamp_column).label("max_timestamp"),
)
.group_by(SerializedDagModel.dag_id)
.subquery()
)

# In Airflow 3.x, fileloc is not available on SerializedDagModel
# We need to get it from DagModel instead
if hasattr(SerializedDagModel, "fileloc"):
# Airflow 2.x: fileloc is on SerializedDagModel
# Use tuple IN clause to get only the latest version of each DAG
session_query = self.session.query(
SerializedDagModel.dag_id,
json_data_column,
SerializedDagModel.fileloc,
).join(
latest_dag_subquery,
and_(
SerializedDagModel.dag_id == latest_dag_subquery.c.dag_id,
timestamp_column == latest_dag_subquery.c.max_timestamp,
),
)
else:
# Airflow 3.x: fileloc is only on DagModel, we need to join
session_query = self.session.query(
SerializedDagModel.dag_id,
json_data_column,
DagModel.fileloc,
).select_from(
join(
SerializedDagModel,
session_query = (
self.session.query(
SerializedDagModel.dag_id,
json_data_column,
DagModel.fileloc,
)
.join(
latest_dag_subquery,
and_(
SerializedDagModel.dag_id == latest_dag_subquery.c.dag_id,
timestamp_column == latest_dag_subquery.c.max_timestamp,
),
)
.join(
DagModel,
SerializedDagModel.dag_id == DagModel.dag_id,
)
Expand Down Expand Up @@ -869,7 +902,7 @@ def get_table_pipeline_observability(

for cache_key, cached_data in self.observability_cache.items():
try:
dag_id, run_id = cache_key
dag_id, _ = cache_key

# Skip current dag to avoid duplicates
if dag_id == pipeline_details.dag_id:
Expand Down
Loading
Loading