Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions src/Processors/Formats/Impl/Parquet/SchemaConverter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ NamesAndTypesList SchemaConverter::inferSchema()
return res;
}

std::string_view SchemaConverter::useColumnMapperIfNeeded(const parq::SchemaElement & element) const
std::string_view SchemaConverter::useColumnMapperIfNeeded(const parq::SchemaElement & element, const String & current_path) const
{
if (!column_mapper)
return element.name;
Expand All @@ -142,8 +142,19 @@ std::string_view SchemaConverter::useColumnMapperIfNeeded(const parq::SchemaElem
auto it = map.find(element.field_id);
if (it == map.end())
throw Exception(ErrorCodes::ICEBERG_SPECIFICATION_VIOLATION, "Parquet file has column {} with field_id {} that is not in datalake metadata", element.name, element.field_id);
auto split = Nested::splitName(std::string_view(it->second), /*reverse=*/ true);
return split.second.empty() ? split.first : split.second;

/// At top level (empty path), return the full mapped name. For nested
/// elements, strip the parent path prefix to get the child name.
if (current_path.empty())
return it->second;

/// Strip "current_path." prefix to get child name (preserves dots in child names)
std::string_view mapped = it->second;
if (mapped.starts_with(current_path) && mapped.size() > current_path.size()
&& mapped[current_path.size()] == '.')
return mapped.substr(current_path.size() + 1);
Comment on lines +151 to +155

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Normalize case before prefix stripping

When case_insensitive_column_matching is enabled, node.name is rewritten to the query’s casing in processSubtree, so current_path can differ in case from the Iceberg-mapped name. The starts_with check here is case-sensitive; if the user queries a tuple/struct column with different case, the prefix won’t be stripped and child names get built with the full mapped path (e.g., mystruct.MyStruct.child), which then fails tryGetPositionByName and can yield missing tuple elements or errors. Consider normalizing current_path/mapped with the same case-insensitive rules before comparing.

Useful? React with 👍 / 👎.


return mapped;
}

void SchemaConverter::processSubtree(TraversalNode & node)
Expand All @@ -160,7 +171,7 @@ void SchemaConverter::processSubtree(TraversalNode & node)

if (node.schema_context == SchemaContext::None)
{
node.appendNameComponent(node.element->name, useColumnMapperIfNeeded(*node.element));
node.appendNameComponent(node.element->name, useColumnMapperIfNeeded(*node.element, node.name));

if (sample_block)
{
Expand Down Expand Up @@ -589,7 +600,7 @@ void SchemaConverter::processSubtreeTuple(TraversalNode & node)
std::vector<String> element_names_in_file;
for (size_t i = 0; i < size_t(node.element->num_children); ++i)
{
const String & element_name = element_names_in_file.emplace_back(useColumnMapperIfNeeded(file_metadata.schema.at(schema_idx)));
const String & element_name = element_names_in_file.emplace_back(useColumnMapperIfNeeded(file_metadata.schema.at(schema_idx), node.name));
std::optional<size_t> idx_in_output_tuple = i - skipped_unsupported_columns;
if (lookup_by_name)
{
Expand Down
6 changes: 4 additions & 2 deletions src/Processors/Formats/Impl/Parquet/SchemaConverter.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,10 @@ struct SchemaConverter
DataTypePtr & out_inferred_type, std::optional<GeoColumnMetadata> geo_metadata) const;

/// Returns element.name or a corresponding name from ColumnMapper.
/// For tuple elements, that's just the element name like `x`, not the whole path like `t.x`.
std::string_view useColumnMapperIfNeeded(const parq::SchemaElement & element) const;
/// For nested tuple elements, returns just the element name like `x`, not the whole path like `t.x`.
/// For top-level columns (when current_path is empty), returns the full mapped name to support
/// column names with dots (e.g., `integer.col` in Iceberg).
std::string_view useColumnMapperIfNeeded(const parq::SchemaElement & element, const String & current_path) const;
};

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ IcebergDataObjectInfo::IcebergDataObjectInfo(Iceberg::ManifestFileEntry data_man
data_manifest_file_entry_.file_path_key.empty() ? std::nullopt : std::make_optional(data_manifest_file_entry_.file_path_key))
, data_object_file_path_key(data_manifest_file_entry_.file_path_key)
, underlying_format_read_schema_id(data_manifest_file_entry_.schema_id)
, file_format(data_manifest_file_entry_.file_format)
, sequence_number(data_manifest_file_entry_.added_sequence_number)
{
if (!position_deletes_objects.empty() && Poco::toUpperInPlace(data_manifest_file_entry_.file_format) != "PARQUET")
Expand All @@ -59,10 +60,11 @@ IcebergDataObjectInfo::IcebergDataObjectInfo(
ObjectStoragePtr resolved_storage,
const String & resolved_key)
: PathWithMetadata(resolved_key, std::nullopt,
data_manifest_file_entry_.file_path.empty() ? std::nullopt : std::make_optional(data_manifest_file_entry_.file_path),
data_manifest_file_entry_.file_path.empty() ? std::nullopt : std::make_optional(data_manifest_file_entry_.file_path),
resolved_storage)
, data_object_file_path_key(data_manifest_file_entry_.file_path_key)
, underlying_format_read_schema_id(data_manifest_file_entry_.schema_id)
, file_format(data_manifest_file_entry_.file_format)
, sequence_number(data_manifest_file_entry_.added_sequence_number)
{
if (!position_deletes_objects.empty() && Poco::toUpperInPlace(data_manifest_file_entry_.file_format) != "PARQUET")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ struct IcebergDataObjectInfo : public PathWithMetadata, std::enable_shared_from_
/// It is used to filter position deletes objects by data file path.
/// It is also used to create a filter for the data object in the position delete transform.
explicit IcebergDataObjectInfo(Iceberg::ManifestFileEntry data_manifest_file_entry_);

/// Sometimes data files are located outside the table location and even in a different storage.
explicit IcebergDataObjectInfo(
Iceberg::ManifestFileEntry data_manifest_file_entry_,
Expand Down Expand Up @@ -50,6 +50,7 @@ struct IcebergDataObjectInfo : public PathWithMetadata, std::enable_shared_from_

String data_object_file_path_key; // Full path to the data object file
Int32 underlying_format_read_schema_id;
String file_format; // Format of the data file (e.g., "PARQUET", "ORC", "AVRO")
std::vector<Iceberg::PositionDeleteObject> position_deletes_objects;
std::vector<Iceberg::ManifestFileEntry> equality_deletes_objects;
Int64 sequence_number;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1077,7 +1077,7 @@ void IcebergMetadata::addDeleteTransformers(

auto [delete_storage_to_use, resolved_delete_key] = resolveObjectStorageForPath(
persistent_components.table_location, delete_file.file_path, object_storage, *secondary_storages, local_context);

PathWithMetadata delete_file_object(resolved_delete_key, std::nullopt, delete_file.file_path, delete_storage_to_use);
{
auto schema_read_buffer = createReadBuffer(delete_file_object, delete_storage_to_use, local_context, log);
Expand Down Expand Up @@ -1198,8 +1198,7 @@ ColumnMapperPtr IcebergMetadata::getColumnMapperForObject(ObjectInfoPtr object_i
IcebergDataObjectInfo * iceberg_object_info = dynamic_cast<IcebergDataObjectInfo *>(object_info.get());
if (!iceberg_object_info)
return nullptr;
auto configuration_ptr = configuration.lock();
if (Poco::toLower(configuration_ptr->getFormat()) != "parquet")
if (Poco::toLower(iceberg_object_info->file_format) != "parquet")
return nullptr;

return persistent_components.schema_processor->getColumnMapperById(iceberg_object_info->underlying_format_read_schema_id);
Expand Down
204 changes: 204 additions & 0 deletions tests/integration/test_storage_iceberg/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4031,3 +4031,207 @@ def get_buffers_count(func):
buffers_count_with_splitted_tasks = get_buffers_count(lambda: instance.query(f"SELECT * FROM {table_function_expr_cluster} ORDER BY ALL SETTINGS input_format_parquet_use_native_reader_v3={input_format_parquet_use_native_reader_v3},cluster_table_function_split_granularity='bucket', cluster_table_function_buckets_batch_size={cluster_table_function_buckets_batch_size}").strip().split())
buffers_count_default = get_buffers_count(lambda: instance.query(f"SELECT * FROM {table_function_expr_cluster} ORDER BY ALL SETTINGS input_format_parquet_use_native_reader_v3={input_format_parquet_use_native_reader_v3}, cluster_table_function_buckets_batch_size={cluster_table_function_buckets_batch_size}").strip().split())
assert buffers_count_with_splitted_tasks > buffers_count_default


@pytest.mark.parametrize("storage_type", ["s3"])
def test_column_names_with_dots(started_cluster, storage_type):
"""
Test that Iceberg tables with dot-separated column names are read correctly.
This tests the fix for field ID-based column name mapping in Parquet V3 reader.
"""
instance = started_cluster.instances["node1"]
spark = started_cluster.spark_session
TABLE_NAME = "test_column_names_with_dots_" + storage_type + "_" + get_uuid_str()

# Create DataFrame with column names containing dots
data = [
(1, "value1", "multi_dot_value1"),
(2, "value2", "multi_dot_value2"),
(3, "value3", "multi_dot_value3"),
]
schema = StructType(
[
StructField("id", IntegerType()),
StructField("name.column", StringType()),
StructField("double.column.dot", StringType()),
]
)
df = spark.createDataFrame(data=data, schema=schema)

write_iceberg_from_df(spark, df, TABLE_NAME, mode="overwrite", format_version="2")

default_upload_directory(
started_cluster,
storage_type,
f"/iceberg_data/default/{TABLE_NAME}/",
f"/iceberg_data/default/{TABLE_NAME}/",
)

# Test via table function
table_function_expr = get_creation_expression(
storage_type, TABLE_NAME, started_cluster, table_function=True
)

# Verify single-dot column name
result = instance.query(
f"SELECT `name.column` FROM {table_function_expr} ORDER BY id"
).strip()
assert result == "value1\nvalue2\nvalue3", f"Expected values, got: {result}"

# Verify multi-dot column name
result = instance.query(
f"SELECT `double.column.dot` FROM {table_function_expr} ORDER BY id"
).strip()
assert result == "multi_dot_value1\nmulti_dot_value2\nmulti_dot_value3", f"Expected values, got: {result}"

# Verify all columns together
result = instance.query(
f"SELECT id, `name.column`, `double.column.dot` FROM {table_function_expr} ORDER BY id"
).strip()
expected = "1\tvalue1\tmulti_dot_value1\n2\tvalue2\tmulti_dot_value2\n3\tvalue3\tmulti_dot_value3"
assert result == expected, f"Expected:\n{expected}\nGot:\n{result}"

# Test via table engine
create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster)

result = instance.query(
f"SELECT `name.column`, `double.column.dot` FROM {TABLE_NAME} ORDER BY id"
).strip()
expected = "value1\tmulti_dot_value1\nvalue2\tmulti_dot_value2\nvalue3\tmulti_dot_value3"
assert result == expected, f"Expected:\n{expected}\nGot:\n{result}"


@pytest.mark.parametrize("storage_type", ["s3"])
def test_nested_struct_with_dotted_field(started_cluster, storage_type):
"""
Test that nested struct fields with dot-separated names are read correctly.
This tests the fix for prefix stripping (not splitName) in useColumnMapperIfNeeded.
E.g., for my_struct.weird.field, we should return "weird.field" not just "field".
"""
instance = started_cluster.instances["node1"]
spark = started_cluster.spark_session
TABLE_NAME = "test_nested_struct_dotted_field_" + storage_type + "_" + get_uuid_str()

# Create DataFrame with nested struct containing a dotted field name
nested_schema = StructType(
[
StructField("id", IntegerType()),
StructField(
"my_struct",
StructType(
[
StructField("normal_field", IntegerType()),
StructField("weird.field", StringType()),
]
),
),
]
)

data = [
(1, (100, "nested_dot_value1")),
(2, (200, "nested_dot_value2")),
(3, (300, "nested_dot_value3")),
]
df = spark.createDataFrame(data=data, schema=nested_schema)

write_iceberg_from_df(spark, df, TABLE_NAME, mode="overwrite", format_version="2")

default_upload_directory(
started_cluster,
storage_type,
f"/iceberg_data/default/{TABLE_NAME}/",
f"/iceberg_data/default/{TABLE_NAME}/",
)

# Test via table function
table_function_expr = get_creation_expression(
storage_type, TABLE_NAME, started_cluster, table_function=True
)

# Verify nested struct with dotted field via table function
result = instance.query(
f"SELECT my_struct.normal_field, `my_struct.weird.field` FROM {table_function_expr} ORDER BY id"
).strip()
expected = "100\tnested_dot_value1\n200\tnested_dot_value2\n300\tnested_dot_value3"
assert result == expected, f"Expected:\n{expected}\nGot:\n{result}"

# Test via table engine
create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster)

result = instance.query(
f"SELECT my_struct.normal_field, `my_struct.weird.field` FROM {TABLE_NAME} ORDER BY id"
).strip()
assert result == expected, f"Expected:\n{expected}\nGot:\n{result}"


@pytest.mark.parametrize("storage_type", ["s3"])
def test_deeply_nested_struct_with_dotted_names(started_cluster, storage_type):
"""
Test deeply nested structs where EVERY level has dots in the name.
Structure: my.struct -> some_dot.separated_parent -> weird.field
Full path: my.struct.some_dot.separated_parent.weird.field

This verifies that prefix stripping works correctly at all nesting depths.
"""
instance = started_cluster.instances["node1"]
spark = started_cluster.spark_session
TABLE_NAME = "test_deeply_nested_dotted_" + storage_type + "_" + get_uuid_str()

# Create 3-level nested schema with dots at every level
nested_schema = StructType(
[
StructField("id", IntegerType()),
StructField(
"my.struct",
StructType(
[
StructField(
"some_dot.separated_parent",
StructType(
[
StructField("weird.field", StringType()),
]
),
),
]
),
),
]
)

data = [
(1, (("deep_value1",),)),
(2, (("deep_value2",),)),
(3, (("deep_value3",),)),
]
df = spark.createDataFrame(data=data, schema=nested_schema)

write_iceberg_from_df(spark, df, TABLE_NAME, mode="overwrite", format_version="2")

default_upload_directory(
started_cluster,
storage_type,
f"/iceberg_data/default/{TABLE_NAME}/",
f"/iceberg_data/default/{TABLE_NAME}/",
)

# Test via table function
table_function_expr = get_creation_expression(
storage_type, TABLE_NAME, started_cluster, table_function=True
)

# Query the deeply nested dotted field
result = instance.query(
f"SELECT `my.struct.some_dot.separated_parent.weird.field` FROM {table_function_expr} ORDER BY id"
).strip()
expected = "deep_value1\ndeep_value2\ndeep_value3"
assert result == expected, f"Expected:\n{expected}\nGot:\n{result}"

# Test via table engine
create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster)

result = instance.query(
f"SELECT `my.struct.some_dot.separated_parent.weird.field` FROM {TABLE_NAME} ORDER BY id"
).strip()
assert result == expected, f"Expected:\n{expected}\nGot:\n{result}"
Loading