From 4b42e406b76c17825479bada08f1fb43403da6ea Mon Sep 17 00:00:00 2001 From: Gayathri Srividya Rajavarapu Date: Thu, 18 Jun 2026 13:51:46 +0530 Subject: [PATCH] fix: filter iceberg_type in all SqlCatalog table operations --- pyiceberg/catalog/sql.py | 33 ++++++++++++++++- tests/catalog/test_sql.py | 76 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 108 insertions(+), 1 deletion(-) diff --git a/pyiceberg/catalog/sql.py b/pyiceberg/catalog/sql.py index 87446bd58b..28ea0777f2 100644 --- a/pyiceberg/catalog/sql.py +++ b/pyiceberg/catalog/sql.py @@ -26,7 +26,9 @@ create_engine, delete, insert, + or_, select, + text, union, update, ) @@ -85,6 +87,9 @@ class SqlCatalogBaseTable(MappedAsDataclass, DeclarativeBase): pass +ICEBERG_TABLE_TYPE = "TABLE" + + class IcebergTables(SqlCatalogBaseTable): __tablename__ = "iceberg_tables" @@ -93,6 +98,7 @@ class IcebergTables(SqlCatalogBaseTable): table_name: Mapped[str] = mapped_column(String(255), nullable=False, primary_key=True) metadata_location: Mapped[str | None] = mapped_column(String(1000), nullable=True) previous_metadata_location: Mapped[str | None] = mapped_column(String(1000), nullable=True) + iceberg_type: Mapped[str | None] = mapped_column(String(255), nullable=True) class IcebergNamespaceProperties(SqlCatalogBaseTable): @@ -147,6 +153,17 @@ def _ensure_tables_exist(self) -> None: self.create_tables() return + # Idempotently add iceberg_type column if it does not exist yet + # (backward-compatible migration for databases created before this column was introduced). + # Older SQLite versions do not support "ADD COLUMN IF NOT EXISTS", so we catch and ignore + # the error if the column is already present. + with Session(self.engine) as session: + try: + session.execute(text("ALTER TABLE iceberg_tables ADD COLUMN iceberg_type VARCHAR(255)")) + session.commit() + except (OperationalError, ProgrammingError): + pass # column already exists + def create_tables(self) -> None: SqlCatalogBaseTable.metadata.create_all(self.engine) @@ -231,6 +248,7 @@ def create_table( table_name=table_name, metadata_location=metadata_location, previous_metadata_location=None, + iceberg_type=ICEBERG_TABLE_TYPE, ) ) session.commit() @@ -273,6 +291,7 @@ def register_table(self, identifier: str | Identifier, metadata_location: str, o table_name=table_name, metadata_location=metadata_location, previous_metadata_location=None, + iceberg_type=ICEBERG_TABLE_TYPE, ) ) session.commit() @@ -305,6 +324,7 @@ def load_table(self, identifier: str | Identifier) -> Table: IcebergTables.catalog_name == self.name, IcebergTables.table_namespace == namespace, IcebergTables.table_name == table_name, + or_(IcebergTables.iceberg_type == ICEBERG_TABLE_TYPE, IcebergTables.iceberg_type.is_(None)), ) result = session.scalar(stmt) if result: @@ -331,6 +351,7 @@ def drop_table(self, identifier: str | Identifier) -> None: IcebergTables.catalog_name == self.name, IcebergTables.table_namespace == namespace, IcebergTables.table_name == table_name, + or_(IcebergTables.iceberg_type == ICEBERG_TABLE_TYPE, IcebergTables.iceberg_type.is_(None)), ) ) if res.rowcount < 1: @@ -344,6 +365,7 @@ def drop_table(self, identifier: str | Identifier) -> None: IcebergTables.catalog_name == self.name, IcebergTables.table_namespace == namespace, IcebergTables.table_name == table_name, + or_(IcebergTables.iceberg_type == ICEBERG_TABLE_TYPE, IcebergTables.iceberg_type.is_(None)), ) .one() ) @@ -385,6 +407,7 @@ def rename_table(self, from_identifier: str | Identifier, to_identifier: str | I IcebergTables.catalog_name == self.name, IcebergTables.table_namespace == from_namespace, IcebergTables.table_name == from_table_name, + or_(IcebergTables.iceberg_type == ICEBERG_TABLE_TYPE, IcebergTables.iceberg_type.is_(None)), ) .values(table_namespace=to_namespace, table_name=to_table_name) ) @@ -400,6 +423,7 @@ def rename_table(self, from_identifier: str | Identifier, to_identifier: str | I IcebergTables.catalog_name == self.name, IcebergTables.table_namespace == from_namespace, IcebergTables.table_name == from_table_name, + or_(IcebergTables.iceberg_type == ICEBERG_TABLE_TYPE, IcebergTables.iceberg_type.is_(None)), ) .one() ) @@ -462,6 +486,7 @@ def commit_table( IcebergTables.table_namespace == namespace, IcebergTables.table_name == table_name, IcebergTables.metadata_location == current_table.metadata_location, + or_(IcebergTables.iceberg_type == ICEBERG_TABLE_TYPE, IcebergTables.iceberg_type.is_(None)), ) .values( metadata_location=updated_staged_table.metadata_location, @@ -481,6 +506,7 @@ def commit_table( IcebergTables.table_namespace == namespace, IcebergTables.table_name == table_name, IcebergTables.metadata_location == current_table.metadata_location, + or_(IcebergTables.iceberg_type == ICEBERG_TABLE_TYPE, IcebergTables.iceberg_type.is_(None)), ) .one() ) @@ -499,6 +525,7 @@ def commit_table( table_name=table_name, metadata_location=updated_staged_table.metadata_location, previous_metadata_location=None, + iceberg_type=ICEBERG_TABLE_TYPE, ) ) session.commit() @@ -615,7 +642,11 @@ def list_tables(self, namespace: str | Identifier) -> list[Identifier]: raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}") namespace = Catalog.namespace_to_string(namespace) - stmt = select(IcebergTables).where(IcebergTables.catalog_name == self.name, IcebergTables.table_namespace == namespace) + stmt = select(IcebergTables).where( + IcebergTables.catalog_name == self.name, + IcebergTables.table_namespace == namespace, + or_(IcebergTables.iceberg_type == ICEBERG_TABLE_TYPE, IcebergTables.iceberg_type.is_(None)), + ) with Session(self.engine) as session: result = session.scalars(stmt) return [(Catalog.identifier_to_tuple(table.table_namespace) + (table.table_name,)) for table in result] diff --git a/tests/catalog/test_sql.py b/tests/catalog/test_sql.py index f6846195fe..eeb3952688 100644 --- a/tests/catalog/test_sql.py +++ b/tests/catalog/test_sql.py @@ -27,6 +27,7 @@ from pyiceberg.catalog.sql import ( DEFAULT_ECHO_VALUE, DEFAULT_POOL_PRE_PING_VALUE, + ICEBERG_TABLE_TYPE, IcebergTables, SqlCatalog, SqlCatalogBaseTable, @@ -261,3 +262,78 @@ def test_sql_catalog_multiple_close_calls(self, catalog_sqlite: SqlCatalog) -> N # Second close should not raise an exception catalog_sqlite.close() + + +class TestIcebergTypeFilter: + """Verify that table operations filter on iceberg_type and ignore view rows.""" + + def _insert_view_row(self, catalog: SqlCatalog, namespace: str, name: str) -> None: + """Directly insert a row with iceberg_type='VIEW' to simulate a view written by another engine.""" + from sqlalchemy.orm import Session + + with Session(catalog.engine) as session: + session.add( + IcebergTables( + catalog_name=catalog.name, + table_namespace=namespace, + table_name=name, + metadata_location=None, + previous_metadata_location=None, + iceberg_type="VIEW", + ) + ) + session.commit() + + def test_iceberg_type_set_on_create(self, catalog_memory: SqlCatalog) -> None: + """Tables created by SqlCatalog should have iceberg_type='TABLE'.""" + from sqlalchemy import select + from sqlalchemy.orm import Session + + catalog_memory.create_namespace("iceberg_type_ns") + schema = Schema(NestedField(1, "id", StringType(), required=True)) + catalog_memory.create_table("iceberg_type_ns.tbl_type_check", schema) + + with Session(catalog_memory.engine) as session: + row = session.scalar( + select(IcebergTables).where( + IcebergTables.catalog_name == catalog_memory.name, + IcebergTables.table_namespace == "iceberg_type_ns", + IcebergTables.table_name == "tbl_type_check", + ) + ) + assert row is not None + assert row.iceberg_type == ICEBERG_TABLE_TYPE + + def test_list_tables_excludes_view_rows(self, catalog_memory: SqlCatalog) -> None: + """list_tables must not return rows with iceberg_type='VIEW'.""" + catalog_memory.create_namespace("view_filter_ns") + self._insert_view_row(catalog_memory, "view_filter_ns", "my_view") + tables = catalog_memory.list_tables("view_filter_ns") + assert ("view_filter_ns", "my_view") not in tables + + def test_load_table_ignores_view_rows(self, catalog_memory: SqlCatalog) -> None: + """load_table must raise NoSuchTableError for rows with iceberg_type='VIEW'.""" + from pyiceberg.exceptions import NoSuchTableError + + catalog_memory.create_namespace("load_view_ns") + self._insert_view_row(catalog_memory, "load_view_ns", "a_view") + with pytest.raises(NoSuchTableError): + catalog_memory.load_table("load_view_ns.a_view") + + def test_drop_table_ignores_view_rows(self, catalog_memory: SqlCatalog) -> None: + """drop_table must raise NoSuchTableError for rows with iceberg_type='VIEW'.""" + from pyiceberg.exceptions import NoSuchTableError + + catalog_memory.create_namespace("drop_view_ns") + self._insert_view_row(catalog_memory, "drop_view_ns", "droppable_view") + with pytest.raises(NoSuchTableError): + catalog_memory.drop_table("drop_view_ns.droppable_view") + + def test_rename_table_ignores_view_rows(self, catalog_memory: SqlCatalog) -> None: + """rename_table must raise NoSuchTableError for rows with iceberg_type='VIEW'.""" + from pyiceberg.exceptions import NoSuchTableError + + catalog_memory.create_namespace("rename_view_ns") + self._insert_view_row(catalog_memory, "rename_view_ns", "renamed_view") + with pytest.raises(NoSuchTableError): + catalog_memory.rename_table("rename_view_ns.renamed_view", "rename_view_ns.new_name")