diff --git a/src/Processors/Formats/Impl/Parquet/SchemaConverter.cpp b/src/Processors/Formats/Impl/Parquet/SchemaConverter.cpp index dc3948f1c185..59b21f7b442d 100644 --- a/src/Processors/Formats/Impl/Parquet/SchemaConverter.cpp +++ b/src/Processors/Formats/Impl/Parquet/SchemaConverter.cpp @@ -127,7 +127,7 @@ NamesAndTypesList SchemaConverter::inferSchema() return res; } -std::string_view SchemaConverter::useColumnMapperIfNeeded(const parq::SchemaElement & element) const +std::string_view SchemaConverter::useColumnMapperIfNeeded(const parq::SchemaElement & element, const String & current_path) const { if (!column_mapper) return element.name; @@ -142,8 +142,19 @@ std::string_view SchemaConverter::useColumnMapperIfNeeded(const parq::SchemaElem auto it = map.find(element.field_id); if (it == map.end()) throw Exception(ErrorCodes::ICEBERG_SPECIFICATION_VIOLATION, "Parquet file has column {} with field_id {} that is not in datalake metadata", element.name, element.field_id); - auto split = Nested::splitName(std::string_view(it->second), /*reverse=*/ true); - return split.second.empty() ? split.first : split.second; + + /// At top level (empty path), return the full mapped name. For nested + /// elements, strip the parent path prefix to get the child name. + if (current_path.empty()) + return it->second; + + /// Strip "current_path." prefix to get child name (preserves dots in child names) + std::string_view mapped = it->second; + if (mapped.starts_with(current_path) && mapped.size() > current_path.size() + && mapped[current_path.size()] == '.') + return mapped.substr(current_path.size() + 1); + + return mapped; } void SchemaConverter::processSubtree(TraversalNode & node) @@ -160,7 +171,7 @@ void SchemaConverter::processSubtree(TraversalNode & node) if (node.schema_context == SchemaContext::None) { - node.appendNameComponent(node.element->name, useColumnMapperIfNeeded(*node.element)); + node.appendNameComponent(node.element->name, useColumnMapperIfNeeded(*node.element, node.name)); if (sample_block) { @@ -589,7 +600,7 @@ void SchemaConverter::processSubtreeTuple(TraversalNode & node) std::vector element_names_in_file; for (size_t i = 0; i < size_t(node.element->num_children); ++i) { - const String & element_name = element_names_in_file.emplace_back(useColumnMapperIfNeeded(file_metadata.schema.at(schema_idx))); + const String & element_name = element_names_in_file.emplace_back(useColumnMapperIfNeeded(file_metadata.schema.at(schema_idx), node.name)); std::optional idx_in_output_tuple = i - skipped_unsupported_columns; if (lookup_by_name) { diff --git a/src/Processors/Formats/Impl/Parquet/SchemaConverter.h b/src/Processors/Formats/Impl/Parquet/SchemaConverter.h index e89e7f75d6f0..0eda419e6966 100644 --- a/src/Processors/Formats/Impl/Parquet/SchemaConverter.h +++ b/src/Processors/Formats/Impl/Parquet/SchemaConverter.h @@ -137,8 +137,10 @@ struct SchemaConverter DataTypePtr & out_inferred_type, std::optional geo_metadata) const; /// Returns element.name or a corresponding name from ColumnMapper. - /// For tuple elements, that's just the element name like `x`, not the whole path like `t.x`. - std::string_view useColumnMapperIfNeeded(const parq::SchemaElement & element) const; + /// For nested tuple elements, returns just the element name like `x`, not the whole path like `t.x`. + /// For top-level columns (when current_path is empty), returns the full mapped name to support + /// column names with dots (e.g., `integer.col` in Iceberg). + std::string_view useColumnMapperIfNeeded(const parq::SchemaElement & element, const String & current_path) const; }; } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.cpp index 057a95cfbf7a..62c142d97501 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.cpp @@ -43,6 +43,7 @@ IcebergDataObjectInfo::IcebergDataObjectInfo(Iceberg::ManifestFileEntry data_man data_manifest_file_entry_.file_path_key.empty() ? std::nullopt : std::make_optional(data_manifest_file_entry_.file_path_key)) , data_object_file_path_key(data_manifest_file_entry_.file_path_key) , underlying_format_read_schema_id(data_manifest_file_entry_.schema_id) + , file_format(data_manifest_file_entry_.file_format) , sequence_number(data_manifest_file_entry_.added_sequence_number) { if (!position_deletes_objects.empty() && Poco::toUpperInPlace(data_manifest_file_entry_.file_format) != "PARQUET") @@ -59,10 +60,11 @@ IcebergDataObjectInfo::IcebergDataObjectInfo( ObjectStoragePtr resolved_storage, const String & resolved_key) : PathWithMetadata(resolved_key, std::nullopt, - data_manifest_file_entry_.file_path.empty() ? std::nullopt : std::make_optional(data_manifest_file_entry_.file_path), + data_manifest_file_entry_.file_path.empty() ? std::nullopt : std::make_optional(data_manifest_file_entry_.file_path), resolved_storage) , data_object_file_path_key(data_manifest_file_entry_.file_path_key) , underlying_format_read_schema_id(data_manifest_file_entry_.schema_id) + , file_format(data_manifest_file_entry_.file_format) , sequence_number(data_manifest_file_entry_.added_sequence_number) { if (!position_deletes_objects.empty() && Poco::toUpperInPlace(data_manifest_file_entry_.file_format) != "PARQUET") diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.h index e5e8e1b6cc09..2d31447e9d39 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.h @@ -22,7 +22,7 @@ struct IcebergDataObjectInfo : public PathWithMetadata, std::enable_shared_from_ /// It is used to filter position deletes objects by data file path. /// It is also used to create a filter for the data object in the position delete transform. explicit IcebergDataObjectInfo(Iceberg::ManifestFileEntry data_manifest_file_entry_); - + /// Sometimes data files are located outside the table location and even in a different storage. explicit IcebergDataObjectInfo( Iceberg::ManifestFileEntry data_manifest_file_entry_, @@ -50,6 +50,7 @@ struct IcebergDataObjectInfo : public PathWithMetadata, std::enable_shared_from_ String data_object_file_path_key; // Full path to the data object file Int32 underlying_format_read_schema_id; + String file_format; // Format of the data file (e.g., "PARQUET", "ORC", "AVRO") std::vector position_deletes_objects; std::vector equality_deletes_objects; Int64 sequence_number; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp index 27249f4bb0f8..bb401a2d418a 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp @@ -1077,7 +1077,7 @@ void IcebergMetadata::addDeleteTransformers( auto [delete_storage_to_use, resolved_delete_key] = resolveObjectStorageForPath( persistent_components.table_location, delete_file.file_path, object_storage, *secondary_storages, local_context); - + PathWithMetadata delete_file_object(resolved_delete_key, std::nullopt, delete_file.file_path, delete_storage_to_use); { auto schema_read_buffer = createReadBuffer(delete_file_object, delete_storage_to_use, local_context, log); @@ -1198,8 +1198,7 @@ ColumnMapperPtr IcebergMetadata::getColumnMapperForObject(ObjectInfoPtr object_i IcebergDataObjectInfo * iceberg_object_info = dynamic_cast(object_info.get()); if (!iceberg_object_info) return nullptr; - auto configuration_ptr = configuration.lock(); - if (Poco::toLower(configuration_ptr->getFormat()) != "parquet") + if (Poco::toLower(iceberg_object_info->file_format) != "parquet") return nullptr; return persistent_components.schema_processor->getColumnMapperById(iceberg_object_info->underlying_format_read_schema_id); diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index 3cfb9e9d053d..39057083ce16 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -4031,3 +4031,207 @@ def get_buffers_count(func): buffers_count_with_splitted_tasks = get_buffers_count(lambda: instance.query(f"SELECT * FROM {table_function_expr_cluster} ORDER BY ALL SETTINGS input_format_parquet_use_native_reader_v3={input_format_parquet_use_native_reader_v3},cluster_table_function_split_granularity='bucket', cluster_table_function_buckets_batch_size={cluster_table_function_buckets_batch_size}").strip().split()) buffers_count_default = get_buffers_count(lambda: instance.query(f"SELECT * FROM {table_function_expr_cluster} ORDER BY ALL SETTINGS input_format_parquet_use_native_reader_v3={input_format_parquet_use_native_reader_v3}, cluster_table_function_buckets_batch_size={cluster_table_function_buckets_batch_size}").strip().split()) assert buffers_count_with_splitted_tasks > buffers_count_default + + +@pytest.mark.parametrize("storage_type", ["s3"]) +def test_column_names_with_dots(started_cluster, storage_type): + """ + Test that Iceberg tables with dot-separated column names are read correctly. + This tests the fix for field ID-based column name mapping in Parquet V3 reader. + """ + instance = started_cluster.instances["node1"] + spark = started_cluster.spark_session + TABLE_NAME = "test_column_names_with_dots_" + storage_type + "_" + get_uuid_str() + + # Create DataFrame with column names containing dots + data = [ + (1, "value1", "multi_dot_value1"), + (2, "value2", "multi_dot_value2"), + (3, "value3", "multi_dot_value3"), + ] + schema = StructType( + [ + StructField("id", IntegerType()), + StructField("name.column", StringType()), + StructField("double.column.dot", StringType()), + ] + ) + df = spark.createDataFrame(data=data, schema=schema) + + write_iceberg_from_df(spark, df, TABLE_NAME, mode="overwrite", format_version="2") + + default_upload_directory( + started_cluster, + storage_type, + f"/iceberg_data/default/{TABLE_NAME}/", + f"/iceberg_data/default/{TABLE_NAME}/", + ) + + # Test via table function + table_function_expr = get_creation_expression( + storage_type, TABLE_NAME, started_cluster, table_function=True + ) + + # Verify single-dot column name + result = instance.query( + f"SELECT `name.column` FROM {table_function_expr} ORDER BY id" + ).strip() + assert result == "value1\nvalue2\nvalue3", f"Expected values, got: {result}" + + # Verify multi-dot column name + result = instance.query( + f"SELECT `double.column.dot` FROM {table_function_expr} ORDER BY id" + ).strip() + assert result == "multi_dot_value1\nmulti_dot_value2\nmulti_dot_value3", f"Expected values, got: {result}" + + # Verify all columns together + result = instance.query( + f"SELECT id, `name.column`, `double.column.dot` FROM {table_function_expr} ORDER BY id" + ).strip() + expected = "1\tvalue1\tmulti_dot_value1\n2\tvalue2\tmulti_dot_value2\n3\tvalue3\tmulti_dot_value3" + assert result == expected, f"Expected:\n{expected}\nGot:\n{result}" + + # Test via table engine + create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster) + + result = instance.query( + f"SELECT `name.column`, `double.column.dot` FROM {TABLE_NAME} ORDER BY id" + ).strip() + expected = "value1\tmulti_dot_value1\nvalue2\tmulti_dot_value2\nvalue3\tmulti_dot_value3" + assert result == expected, f"Expected:\n{expected}\nGot:\n{result}" + + +@pytest.mark.parametrize("storage_type", ["s3"]) +def test_nested_struct_with_dotted_field(started_cluster, storage_type): + """ + Test that nested struct fields with dot-separated names are read correctly. + This tests the fix for prefix stripping (not splitName) in useColumnMapperIfNeeded. + E.g., for my_struct.weird.field, we should return "weird.field" not just "field". + """ + instance = started_cluster.instances["node1"] + spark = started_cluster.spark_session + TABLE_NAME = "test_nested_struct_dotted_field_" + storage_type + "_" + get_uuid_str() + + # Create DataFrame with nested struct containing a dotted field name + nested_schema = StructType( + [ + StructField("id", IntegerType()), + StructField( + "my_struct", + StructType( + [ + StructField("normal_field", IntegerType()), + StructField("weird.field", StringType()), + ] + ), + ), + ] + ) + + data = [ + (1, (100, "nested_dot_value1")), + (2, (200, "nested_dot_value2")), + (3, (300, "nested_dot_value3")), + ] + df = spark.createDataFrame(data=data, schema=nested_schema) + + write_iceberg_from_df(spark, df, TABLE_NAME, mode="overwrite", format_version="2") + + default_upload_directory( + started_cluster, + storage_type, + f"/iceberg_data/default/{TABLE_NAME}/", + f"/iceberg_data/default/{TABLE_NAME}/", + ) + + # Test via table function + table_function_expr = get_creation_expression( + storage_type, TABLE_NAME, started_cluster, table_function=True + ) + + # Verify nested struct with dotted field via table function + result = instance.query( + f"SELECT my_struct.normal_field, `my_struct.weird.field` FROM {table_function_expr} ORDER BY id" + ).strip() + expected = "100\tnested_dot_value1\n200\tnested_dot_value2\n300\tnested_dot_value3" + assert result == expected, f"Expected:\n{expected}\nGot:\n{result}" + + # Test via table engine + create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster) + + result = instance.query( + f"SELECT my_struct.normal_field, `my_struct.weird.field` FROM {TABLE_NAME} ORDER BY id" + ).strip() + assert result == expected, f"Expected:\n{expected}\nGot:\n{result}" + + +@pytest.mark.parametrize("storage_type", ["s3"]) +def test_deeply_nested_struct_with_dotted_names(started_cluster, storage_type): + """ + Test deeply nested structs where EVERY level has dots in the name. + Structure: my.struct -> some_dot.separated_parent -> weird.field + Full path: my.struct.some_dot.separated_parent.weird.field + + This verifies that prefix stripping works correctly at all nesting depths. + """ + instance = started_cluster.instances["node1"] + spark = started_cluster.spark_session + TABLE_NAME = "test_deeply_nested_dotted_" + storage_type + "_" + get_uuid_str() + + # Create 3-level nested schema with dots at every level + nested_schema = StructType( + [ + StructField("id", IntegerType()), + StructField( + "my.struct", + StructType( + [ + StructField( + "some_dot.separated_parent", + StructType( + [ + StructField("weird.field", StringType()), + ] + ), + ), + ] + ), + ), + ] + ) + + data = [ + (1, (("deep_value1",),)), + (2, (("deep_value2",),)), + (3, (("deep_value3",),)), + ] + df = spark.createDataFrame(data=data, schema=nested_schema) + + write_iceberg_from_df(spark, df, TABLE_NAME, mode="overwrite", format_version="2") + + default_upload_directory( + started_cluster, + storage_type, + f"/iceberg_data/default/{TABLE_NAME}/", + f"/iceberg_data/default/{TABLE_NAME}/", + ) + + # Test via table function + table_function_expr = get_creation_expression( + storage_type, TABLE_NAME, started_cluster, table_function=True + ) + + # Query the deeply nested dotted field + result = instance.query( + f"SELECT `my.struct.some_dot.separated_parent.weird.field` FROM {table_function_expr} ORDER BY id" + ).strip() + expected = "deep_value1\ndeep_value2\ndeep_value3" + assert result == expected, f"Expected:\n{expected}\nGot:\n{result}" + + # Test via table engine + create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster) + + result = instance.query( + f"SELECT `my.struct.some_dot.separated_parent.weird.field` FROM {TABLE_NAME} ORDER BY id" + ).strip() + assert result == expected, f"Expected:\n{expected}\nGot:\n{result}"