Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Comment thread
rluvaton marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,43 @@ async fn test_sort_with_limited_memory_and_large_record_batch() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn test_sort_with_limited_memory_and_oversized_record_batch() -> Result<()> {
let record_batch_size = 8192;
let pool_size = 2 * MB as usize;
let task_ctx = {
let memory_pool = Arc::new(FairSpillPool::new(pool_size));
TaskContext::default()
.with_session_config(
SessionConfig::new()
.with_batch_size(record_batch_size)
.with_sort_spill_reservation_bytes(1),
)
.with_runtime(Arc::new(
RuntimeEnvBuilder::new()
.with_memory_pool(memory_pool)
.build()?,
))
};

// Each spilled run's largest batch is so big that two merge streams cannot be
// reserved at once even at the smallest read-buffer size (`2 * (2 * batch) >
// pool`), yet a single stream still fits (`2 * batch < pool`). Reducing the
// buffer size therefore cannot help, the multi-level merge has to re-spill a
// run with a smaller batch size to make progress instead of failing with
// `ResourcesExhausted`.
run_sort_test_with_limited_memory(RunTestWithLimitedMemoryArgs {
pool_size,
task_ctx: Arc::new(task_ctx),
number_of_record_batches: 100,
get_size_of_record_batch_to_generate: Box::pin(move |_| pool_size / 3),
memory_behavior: Default::default(),
})
.await?;

Ok(())
}

struct RunTestWithLimitedMemoryArgs {
pool_size: usize,
task_ctx: Arc<TaskContext>,
Expand Down
59 changes: 59 additions & 0 deletions datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,65 @@ async fn test_external_sort_zero_merge_reservation() {
assert!(spill_count > 0);
}

/// End-to-end (SQL-level) reproducer for the skewed-batch multi-level merge bug.
///
/// The workload is a sort over wide rows under a tight memory budget. Each spilled
/// run's largest record batch is so wide that two merge streams cannot both be
/// reserved at once (`~4 * max_batch > pool`), yet a single stream still fits
/// (`~2 * max_batch < pool`). Reducing the read-ahead buffer therefore cannot help.
///
/// Before the fix the multi-level merge gave up here with `ResourcesExhausted`; now
/// it re-spills the blocking run with a smaller batch size and the query completes.
///
/// This complements the low-level unit tests in `multi_level_merge.rs`: it drives the
/// whole sort -> spill -> multi-level-merge pipeline from a SQL query, so the coverage
/// survives refactors of the merge internals.
#[tokio::test]
async fn test_sort_skewed_batches_spill() {
let pool_size = 2 * 1024 * 1024; // 2MB
let runtime = RuntimeEnvBuilder::new()
.with_memory_pool(Arc::new(FairSpillPool::new(pool_size)))
.build_arc()
.unwrap();

let config = SessionConfig::new()
.with_sort_spill_reservation_bytes(1)
.with_batch_size(8192)
.with_target_partitions(1);

let ctx = SessionContext::new_with_config_rt(config, runtime);

// Each row carries a ~100-byte string payload, so a full 8192-row batch is
// ~0.9MB. Reserving two such streams needs ~4 * 0.9MB > 2MB and cannot fit,
// while a single stream (~1.8MB) still fits - exactly the skew the fix handles.
// Sorting by the narrow `v` key forces the wide payload to be carried through
// the spill/merge path.
let row_count = 131072;
let query = "SELECT v, repeat('a', 100) AS payload \
FROM generate_series(1, 131072) AS t(v) \
ORDER BY v DESC";
let df = ctx.sql(query).await.unwrap();

let physical_plan = df.create_physical_plan().await.unwrap();
let task_ctx = Arc::new(TaskContext::from(&ctx.state()));
let stream = physical_plan.execute(0, task_ctx).unwrap();
let batches = collect(stream)
.await
.expect("skewed sort should re-spill and complete, not exhaust memory");

// Every input row must come out of the merge.
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, row_count);

// The query must actually spill, otherwise it never reaches the merge path
// this test is meant to cover.
let metrics = physical_plan.metrics().unwrap();
assert!(
metrics.spill_count().unwrap() > 0,
"expected the sort to spill to disk"
);
}

// Tests for disk limit (`max_temp_directory_size` in `DiskManager`)
// ------------------------------------------------------------------

Expand Down
Loading
Loading