Skip to content

Commit 6bda658

Browse files
authored
Add case-sensitive support for equality deletes in DeleteFilter (#1930)
1 parent b047baa commit 6bda658

File tree

6 files changed

+85
-3
lines changed

6 files changed

+85
-3
lines changed

crates/iceberg/src/arrow/caching_delete_file_loader.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -911,6 +911,7 @@ mod tests {
911911
partition: None,
912912
partition_spec: None,
913913
name_mapping: None,
914+
case_sensitive: false,
914915
};
915916

916917
// Load the deletes - should handle both types without error

crates/iceberg/src/arrow/delete_filter.rs

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,8 @@ impl DeleteFilter {
141141
return Ok(None);
142142
}
143143

144-
// TODO: handle case-insensitive case
145-
let bound_predicate = combined_predicate.bind(file_scan_task.schema.clone(), false)?;
144+
let bound_predicate = combined_predicate
145+
.bind(file_scan_task.schema.clone(), file_scan_task.case_sensitive)?;
146146
Ok(Some(bound_predicate))
147147
}
148148

@@ -211,8 +211,9 @@ pub(crate) mod tests {
211211

212212
use super::*;
213213
use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader;
214+
use crate::expr::Reference;
214215
use crate::io::FileIO;
215-
use crate::spec::{DataFileFormat, Schema};
216+
use crate::spec::{DataFileFormat, Datum, NestedField, PrimitiveType, Schema, Type};
216217

217218
type ArrowSchemaRef = Arc<ArrowSchema>;
218219

@@ -344,6 +345,7 @@ pub(crate) mod tests {
344345
partition: None,
345346
partition_spec: None,
346347
name_mapping: None,
348+
case_sensitive: false,
347349
},
348350
FileScanTask {
349351
start: 0,
@@ -358,6 +360,7 @@ pub(crate) mod tests {
358360
partition: None,
359361
partition_spec: None,
360362
name_mapping: None,
363+
case_sensitive: false,
361364
},
362365
];
363366

@@ -380,4 +383,57 @@ pub(crate) mod tests {
380383
];
381384
Arc::new(arrow_schema::Schema::new(fields))
382385
}
386+
387+
#[tokio::test]
388+
async fn test_build_equality_delete_predicate_case_sensitive() {
389+
let schema = Arc::new(
390+
Schema::builder()
391+
.with_schema_id(1)
392+
.with_fields(vec![
393+
NestedField::required(1, "Id", Type::Primitive(PrimitiveType::Long)).into(),
394+
])
395+
.build()
396+
.unwrap(),
397+
);
398+
399+
// ---------- fake FileScanTask ----------
400+
let task = FileScanTask {
401+
start: 0,
402+
length: 0,
403+
record_count: None,
404+
data_file_path: "data.parquet".to_string(),
405+
data_file_format: crate::spec::DataFileFormat::Parquet,
406+
schema: schema.clone(),
407+
project_field_ids: vec![],
408+
predicate: None,
409+
deletes: vec![FileScanTaskDeleteFile {
410+
file_path: "eq-del.parquet".to_string(),
411+
file_type: DataContentType::EqualityDeletes,
412+
partition_spec_id: 0,
413+
equality_ids: None,
414+
}],
415+
partition: None,
416+
partition_spec: None,
417+
name_mapping: None,
418+
case_sensitive: true,
419+
};
420+
421+
let filter = DeleteFilter::default();
422+
423+
// ---------- insert equality delete predicate ----------
424+
let pred = Reference::new("id").equal_to(Datum::long(10));
425+
426+
let (tx, rx) = tokio::sync::oneshot::channel();
427+
filter.insert_equality_delete("eq-del.parquet", rx);
428+
429+
tx.send(pred).unwrap();
430+
431+
// ---------- should FAIL ----------
432+
let result = filter.build_equality_delete_predicate(&task).await;
433+
434+
assert!(
435+
result.is_err(),
436+
"case_sensitive=true should fail when column case mismatches"
437+
);
438+
}
383439
}

crates/iceberg/src/arrow/reader.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2082,6 +2082,7 @@ message schema {
20822082
partition: None,
20832083
partition_spec: None,
20842084
name_mapping: None,
2085+
case_sensitive: false,
20852086
})]
20862087
.into_iter(),
20872088
)) as FileScanTaskStream;
@@ -2403,6 +2404,7 @@ message schema {
24032404
partition: None,
24042405
partition_spec: None,
24052406
name_mapping: None,
2407+
case_sensitive: false,
24062408
};
24072409

24082410
// Task 2: read the second and third row groups
@@ -2419,6 +2421,7 @@ message schema {
24192421
partition: None,
24202422
partition_spec: None,
24212423
name_mapping: None,
2424+
case_sensitive: false,
24222425
};
24232426

24242427
let tasks1 = Box::pin(futures::stream::iter(vec![Ok(task1)])) as FileScanTaskStream;
@@ -2546,6 +2549,7 @@ message schema {
25462549
partition: None,
25472550
partition_spec: None,
25482551
name_mapping: None,
2552+
case_sensitive: false,
25492553
})]
25502554
.into_iter(),
25512555
)) as FileScanTaskStream;
@@ -2717,6 +2721,7 @@ message schema {
27172721
partition: None,
27182722
partition_spec: None,
27192723
name_mapping: None,
2724+
case_sensitive: false,
27202725
};
27212726

27222727
let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
@@ -2934,6 +2939,7 @@ message schema {
29342939
partition: None,
29352940
partition_spec: None,
29362941
name_mapping: None,
2942+
case_sensitive: false,
29372943
};
29382944

29392945
let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
@@ -3144,6 +3150,7 @@ message schema {
31443150
partition: None,
31453151
partition_spec: None,
31463152
name_mapping: None,
3153+
case_sensitive: false,
31473154
};
31483155

31493156
let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
@@ -3247,6 +3254,7 @@ message schema {
32473254
partition: None,
32483255
partition_spec: None,
32493256
name_mapping: None,
3257+
case_sensitive: false,
32503258
})]
32513259
.into_iter(),
32523260
)) as FileScanTaskStream;
@@ -3344,6 +3352,7 @@ message schema {
33443352
partition: None,
33453353
partition_spec: None,
33463354
name_mapping: None,
3355+
case_sensitive: false,
33473356
})]
33483357
.into_iter(),
33493358
)) as FileScanTaskStream;
@@ -3430,6 +3439,7 @@ message schema {
34303439
partition: None,
34313440
partition_spec: None,
34323441
name_mapping: None,
3442+
case_sensitive: false,
34333443
})]
34343444
.into_iter(),
34353445
)) as FileScanTaskStream;
@@ -3530,6 +3540,7 @@ message schema {
35303540
partition: None,
35313541
partition_spec: None,
35323542
name_mapping: None,
3543+
case_sensitive: false,
35333544
})]
35343545
.into_iter(),
35353546
)) as FileScanTaskStream;
@@ -3659,6 +3670,7 @@ message schema {
36593670
partition: None,
36603671
partition_spec: None,
36613672
name_mapping: None,
3673+
case_sensitive: false,
36623674
})]
36633675
.into_iter(),
36643676
)) as FileScanTaskStream;
@@ -3755,6 +3767,7 @@ message schema {
37553767
partition: None,
37563768
partition_spec: None,
37573769
name_mapping: None,
3770+
case_sensitive: false,
37583771
})]
37593772
.into_iter(),
37603773
)) as FileScanTaskStream;
@@ -3864,6 +3877,7 @@ message schema {
38643877
partition: None,
38653878
partition_spec: None,
38663879
name_mapping: None,
3880+
case_sensitive: false,
38673881
})]
38683882
.into_iter(),
38693883
)) as FileScanTaskStream;
@@ -4003,6 +4017,7 @@ message schema {
40034017
partition: Some(partition_data),
40044018
partition_spec: Some(partition_spec),
40054019
name_mapping: None,
4020+
case_sensitive: false,
40064021
})]
40074022
.into_iter(),
40084023
)) as FileScanTaskStream;

crates/iceberg/src/scan/context.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ pub(crate) struct ManifestFileContext {
4646
snapshot_schema: SchemaRef,
4747
expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
4848
delete_file_index: DeleteFileIndex,
49+
case_sensitive: bool,
4950
}
5051

5152
/// Wraps a [`ManifestEntryRef`] alongside the objects that are needed
@@ -59,6 +60,7 @@ pub(crate) struct ManifestEntryContext {
5960
pub partition_spec_id: i32,
6061
pub snapshot_schema: SchemaRef,
6162
pub delete_file_index: DeleteFileIndex,
63+
pub case_sensitive: bool,
6264
}
6365

6466
impl ManifestFileContext {
@@ -89,6 +91,7 @@ impl ManifestFileContext {
8991
bound_predicates: bound_predicates.clone(),
9092
snapshot_schema: snapshot_schema.clone(),
9193
delete_file_index: delete_file_index.clone(),
94+
case_sensitive: self.case_sensitive,
9295
};
9396

9497
sender
@@ -135,6 +138,7 @@ impl ManifestEntryContext {
135138
partition_spec: None,
136139
// TODO: Extract name_mapping from table metadata property "schema.name-mapping.default"
137140
name_mapping: None,
141+
case_sensitive: self.case_sensitive,
138142
})
139143
}
140144
}
@@ -277,6 +281,7 @@ impl PlanContext {
277281
field_ids: self.field_ids.clone(),
278282
expression_evaluator_cache: self.expression_evaluator_cache.clone(),
279283
delete_file_index,
284+
case_sensitive: self.case_sensitive,
280285
}
281286
}
282287
}

crates/iceberg/src/scan/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1885,6 +1885,7 @@ pub mod tests {
18851885
partition: None,
18861886
partition_spec: None,
18871887
name_mapping: None,
1888+
case_sensitive: false,
18881889
};
18891890
test_fn(task);
18901891

@@ -1902,6 +1903,7 @@ pub mod tests {
19021903
partition: None,
19031904
partition_spec: None,
19041905
name_mapping: None,
1906+
case_sensitive: false,
19051907
};
19061908
test_fn(task);
19071909
}

crates/iceberg/src/scan/task.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,9 @@ pub struct FileScanTask {
104104
#[serde(serialize_with = "serialize_not_implemented")]
105105
#[serde(deserialize_with = "deserialize_not_implemented")]
106106
pub name_mapping: Option<Arc<NameMapping>>,
107+
108+
/// Whether this scan task should treat column names as case-sensitive when binding predicates.
109+
pub case_sensitive: bool,
107110
}
108111

109112
impl FileScanTask {

0 commit comments

Comments
 (0)