Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion pyiceberg/catalog/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
create_engine,
delete,
insert,
or_,
select,
text,
union,
update,
)
Expand Down Expand Up @@ -85,6 +87,9 @@ class SqlCatalogBaseTable(MappedAsDataclass, DeclarativeBase):
pass


ICEBERG_TABLE_TYPE = "TABLE"


class IcebergTables(SqlCatalogBaseTable):
__tablename__ = "iceberg_tables"

Expand All @@ -93,6 +98,7 @@ class IcebergTables(SqlCatalogBaseTable):
table_name: Mapped[str] = mapped_column(String(255), nullable=False, primary_key=True)
metadata_location: Mapped[str | None] = mapped_column(String(1000), nullable=True)
previous_metadata_location: Mapped[str | None] = mapped_column(String(1000), nullable=True)
iceberg_type: Mapped[str | None] = mapped_column(String(255), nullable=True)


class IcebergNamespaceProperties(SqlCatalogBaseTable):
Expand Down Expand Up @@ -147,6 +153,17 @@ def _ensure_tables_exist(self) -> None:
self.create_tables()
return

# Idempotently add iceberg_type column if it does not exist yet
# (backward-compatible migration for databases created before this column was introduced).
# Older SQLite versions do not support "ADD COLUMN IF NOT EXISTS", so we catch and ignore
# the error if the column is already present.
with Session(self.engine) as session:
try:
session.execute(text("ALTER TABLE iceberg_tables ADD COLUMN iceberg_type VARCHAR(255)"))
session.commit()
except (OperationalError, ProgrammingError):
pass # column already exists

def create_tables(self) -> None:
SqlCatalogBaseTable.metadata.create_all(self.engine)

Expand Down Expand Up @@ -231,6 +248,7 @@ def create_table(
table_name=table_name,
metadata_location=metadata_location,
previous_metadata_location=None,
iceberg_type=ICEBERG_TABLE_TYPE,
)
)
session.commit()
Expand Down Expand Up @@ -273,6 +291,7 @@ def register_table(self, identifier: str | Identifier, metadata_location: str, o
table_name=table_name,
metadata_location=metadata_location,
previous_metadata_location=None,
iceberg_type=ICEBERG_TABLE_TYPE,
)
)
session.commit()
Expand Down Expand Up @@ -305,6 +324,7 @@ def load_table(self, identifier: str | Identifier) -> Table:
IcebergTables.catalog_name == self.name,
IcebergTables.table_namespace == namespace,
IcebergTables.table_name == table_name,
or_(IcebergTables.iceberg_type == ICEBERG_TABLE_TYPE, IcebergTables.iceberg_type.is_(None)),
)
result = session.scalar(stmt)
if result:
Expand All @@ -331,6 +351,7 @@ def drop_table(self, identifier: str | Identifier) -> None:
IcebergTables.catalog_name == self.name,
IcebergTables.table_namespace == namespace,
IcebergTables.table_name == table_name,
or_(IcebergTables.iceberg_type == ICEBERG_TABLE_TYPE, IcebergTables.iceberg_type.is_(None)),
)
)
if res.rowcount < 1:
Expand All @@ -344,6 +365,7 @@ def drop_table(self, identifier: str | Identifier) -> None:
IcebergTables.catalog_name == self.name,
IcebergTables.table_namespace == namespace,
IcebergTables.table_name == table_name,
or_(IcebergTables.iceberg_type == ICEBERG_TABLE_TYPE, IcebergTables.iceberg_type.is_(None)),
)
.one()
)
Expand Down Expand Up @@ -385,6 +407,7 @@ def rename_table(self, from_identifier: str | Identifier, to_identifier: str | I
IcebergTables.catalog_name == self.name,
IcebergTables.table_namespace == from_namespace,
IcebergTables.table_name == from_table_name,
or_(IcebergTables.iceberg_type == ICEBERG_TABLE_TYPE, IcebergTables.iceberg_type.is_(None)),
)
.values(table_namespace=to_namespace, table_name=to_table_name)
)
Expand All @@ -400,6 +423,7 @@ def rename_table(self, from_identifier: str | Identifier, to_identifier: str | I
IcebergTables.catalog_name == self.name,
IcebergTables.table_namespace == from_namespace,
IcebergTables.table_name == from_table_name,
or_(IcebergTables.iceberg_type == ICEBERG_TABLE_TYPE, IcebergTables.iceberg_type.is_(None)),
)
.one()
)
Expand Down Expand Up @@ -462,6 +486,7 @@ def commit_table(
IcebergTables.table_namespace == namespace,
IcebergTables.table_name == table_name,
IcebergTables.metadata_location == current_table.metadata_location,
or_(IcebergTables.iceberg_type == ICEBERG_TABLE_TYPE, IcebergTables.iceberg_type.is_(None)),
)
.values(
metadata_location=updated_staged_table.metadata_location,
Expand All @@ -481,6 +506,7 @@ def commit_table(
IcebergTables.table_namespace == namespace,
IcebergTables.table_name == table_name,
IcebergTables.metadata_location == current_table.metadata_location,
or_(IcebergTables.iceberg_type == ICEBERG_TABLE_TYPE, IcebergTables.iceberg_type.is_(None)),
)
.one()
)
Expand All @@ -499,6 +525,7 @@ def commit_table(
table_name=table_name,
metadata_location=updated_staged_table.metadata_location,
previous_metadata_location=None,
iceberg_type=ICEBERG_TABLE_TYPE,
)
)
session.commit()
Expand Down Expand Up @@ -615,7 +642,11 @@ def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}")

namespace = Catalog.namespace_to_string(namespace)
stmt = select(IcebergTables).where(IcebergTables.catalog_name == self.name, IcebergTables.table_namespace == namespace)
stmt = select(IcebergTables).where(
IcebergTables.catalog_name == self.name,
IcebergTables.table_namespace == namespace,
or_(IcebergTables.iceberg_type == ICEBERG_TABLE_TYPE, IcebergTables.iceberg_type.is_(None)),
)
with Session(self.engine) as session:
result = session.scalars(stmt)
return [(Catalog.identifier_to_tuple(table.table_namespace) + (table.table_name,)) for table in result]
Expand Down
76 changes: 76 additions & 0 deletions tests/catalog/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from pyiceberg.catalog.sql import (
DEFAULT_ECHO_VALUE,
DEFAULT_POOL_PRE_PING_VALUE,
ICEBERG_TABLE_TYPE,
IcebergTables,
SqlCatalog,
SqlCatalogBaseTable,
Expand Down Expand Up @@ -261,3 +262,78 @@ def test_sql_catalog_multiple_close_calls(self, catalog_sqlite: SqlCatalog) -> N

# Second close should not raise an exception
catalog_sqlite.close()


class TestIcebergTypeFilter:
"""Verify that table operations filter on iceberg_type and ignore view rows."""

def _insert_view_row(self, catalog: SqlCatalog, namespace: str, name: str) -> None:
"""Directly insert a row with iceberg_type='VIEW' to simulate a view written by another engine."""
from sqlalchemy.orm import Session

with Session(catalog.engine) as session:
session.add(
IcebergTables(
catalog_name=catalog.name,
table_namespace=namespace,
table_name=name,
metadata_location=None,
previous_metadata_location=None,
iceberg_type="VIEW",
)
)
session.commit()

def test_iceberg_type_set_on_create(self, catalog_memory: SqlCatalog) -> None:
"""Tables created by SqlCatalog should have iceberg_type='TABLE'."""
from sqlalchemy import select
from sqlalchemy.orm import Session

catalog_memory.create_namespace("iceberg_type_ns")
schema = Schema(NestedField(1, "id", StringType(), required=True))
catalog_memory.create_table("iceberg_type_ns.tbl_type_check", schema)

with Session(catalog_memory.engine) as session:
row = session.scalar(
select(IcebergTables).where(
IcebergTables.catalog_name == catalog_memory.name,
IcebergTables.table_namespace == "iceberg_type_ns",
IcebergTables.table_name == "tbl_type_check",
)
)
assert row is not None
assert row.iceberg_type == ICEBERG_TABLE_TYPE

def test_list_tables_excludes_view_rows(self, catalog_memory: SqlCatalog) -> None:
"""list_tables must not return rows with iceberg_type='VIEW'."""
catalog_memory.create_namespace("view_filter_ns")
self._insert_view_row(catalog_memory, "view_filter_ns", "my_view")
tables = catalog_memory.list_tables("view_filter_ns")
assert ("view_filter_ns", "my_view") not in tables

def test_load_table_ignores_view_rows(self, catalog_memory: SqlCatalog) -> None:
"""load_table must raise NoSuchTableError for rows with iceberg_type='VIEW'."""
from pyiceberg.exceptions import NoSuchTableError

catalog_memory.create_namespace("load_view_ns")
self._insert_view_row(catalog_memory, "load_view_ns", "a_view")
with pytest.raises(NoSuchTableError):
catalog_memory.load_table("load_view_ns.a_view")

def test_drop_table_ignores_view_rows(self, catalog_memory: SqlCatalog) -> None:
"""drop_table must raise NoSuchTableError for rows with iceberg_type='VIEW'."""
from pyiceberg.exceptions import NoSuchTableError

catalog_memory.create_namespace("drop_view_ns")
self._insert_view_row(catalog_memory, "drop_view_ns", "droppable_view")
with pytest.raises(NoSuchTableError):
catalog_memory.drop_table("drop_view_ns.droppable_view")

def test_rename_table_ignores_view_rows(self, catalog_memory: SqlCatalog) -> None:
"""rename_table must raise NoSuchTableError for rows with iceberg_type='VIEW'."""
from pyiceberg.exceptions import NoSuchTableError

catalog_memory.create_namespace("rename_view_ns")
self._insert_view_row(catalog_memory, "rename_view_ns", "renamed_view")
with pytest.raises(NoSuchTableError):
catalog_memory.rename_table("rename_view_ns.renamed_view", "rename_view_ns.new_name")