Skip to content

Commit 64ec68f

Browse files
Allow InternalMerger::extract to yield when full buffer (#711)
* Allow InternalMerger::extract to yield when full buffer * Tweak implementations for clarity
1 parent 905537e commit 64ec68f

2 files changed

Lines changed: 59 additions & 20 deletions

File tree

differential-dataflow/examples/columnar/columnar_support.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -648,11 +648,18 @@ pub mod arrangement {
648648

649649
fn extract(
650650
&mut self,
651+
position: &mut usize,
651652
upper: AntichainRef<U::Time>,
652653
frontier: &mut Antichain<U::Time>,
653654
keep: &mut Self,
654655
ship: &mut Self,
655656
) {
657+
// `Updates::at_capacity` is a fixed-threshold check (not the
658+
// `len == capacity` quirk that Vec/TimelyStack have), so a
659+
// single-shot pass is fine here. Advance `*position` to the
660+
// end so the caller's `while position < len` loop exits.
661+
let _ = position;
662+
*position = self.diffs.values.len();
656663
let mut time = U::Time::default();
657664
for key_idx in 0 .. self.keys.values.len() {
658665
let key = self.keys.values.borrow().get(key_idx);

differential-dataflow/src/trace/implementations/merge_batcher.rs

Lines changed: 52 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -269,8 +269,22 @@ pub mod container {
269269

270270
/// Extract updates from `self` into `ship` (times not beyond `upper`)
271271
/// and `keep` (times beyond `upper`), updating `frontier` with kept times.
272+
///
273+
/// Iteration starts at `*position` and advances `*position` as updates
274+
/// are consumed. The implementation must yield (return early) when
275+
/// either `keep.at_capacity()` or `ship.at_capacity()` becomes true,
276+
/// so the caller can swap out a full output buffer and resume by
277+
/// calling `extract` again. The caller invokes `extract` repeatedly
278+
/// until `*position >= self.len()`.
279+
///
280+
/// This shape exists because `at_capacity()` for `Vec` and
281+
/// `TimelyStack` is `len() == capacity()`, which silently becomes
282+
/// false again the moment a push past capacity grows the backing
283+
/// allocation. Without per-element yielding, a single `extract` call
284+
/// can quietly produce oversized output chunks.
272285
fn extract(
273286
&mut self,
287+
position: &mut usize,
274288
upper: AntichainRef<Self::TimeOwned>,
275289
frontier: &mut Antichain<Self::TimeOwned>,
276290
keep: &mut Self,
@@ -405,22 +419,27 @@ pub mod container {
405419
let mut keep = self.empty(stash);
406420
let mut ready = self.empty(stash);
407421

408-
for chunk in merged {
409-
for (data, time, diff) in chunk {
422+
for mut chunk in merged {
423+
// Go update-by-update to swap out full containers.
424+
for (data, time, diff) in chunk.drain(..) {
410425
if upper.less_equal(&time) {
411426
frontier.insert_with(&time, |time| time.clone());
412427
keep.push((data, time, diff));
413428
} else {
414429
ready.push((data, time, diff));
415430
}
431+
if keep.at_capacity() {
432+
kept.push(std::mem::take(&mut keep));
433+
keep = self.empty(stash);
434+
}
435+
if ready.at_capacity() {
436+
ship.push(std::mem::take(&mut ready));
437+
ready = self.empty(stash);
438+
}
416439
}
417-
if keep.at_capacity() {
418-
kept.push(std::mem::take(&mut keep));
419-
keep = self.empty(stash);
420-
}
421-
if ready.at_capacity() {
422-
ship.push(std::mem::take(&mut ready));
423-
ready = self.empty(stash);
440+
// Recycle the now-empty chunk if it has the right capacity.
441+
if chunk.capacity() == Self::target_capacity() {
442+
stash.push(chunk);
424443
}
425444
}
426445
if !keep.is_empty() { kept.push(keep); }
@@ -542,16 +561,20 @@ pub mod container {
542561
let mut ready = self.empty(stash);
543562

544563
for mut buffer in merged {
545-
buffer.extract(upper, frontier, &mut keep, &mut ready);
546-
self.recycle(buffer, stash);
547-
if keep.at_capacity() {
548-
kept.push(std::mem::take(&mut keep));
549-
keep = self.empty(stash);
550-
}
551-
if ready.at_capacity() {
552-
ship.push(std::mem::take(&mut ready));
553-
ready = self.empty(stash);
564+
let mut position = 0;
565+
let len = buffer.len();
566+
while position < len {
567+
buffer.extract(&mut position, upper, frontier, &mut keep, &mut ready);
568+
if keep.at_capacity() {
569+
kept.push(std::mem::take(&mut keep));
570+
keep = self.empty(stash);
571+
}
572+
if ready.at_capacity() {
573+
ship.push(std::mem::take(&mut ready));
574+
ready = self.empty(stash);
575+
}
554576
}
577+
self.recycle(buffer, stash);
555578
}
556579
if !keep.is_empty() {
557580
kept.push(keep);
@@ -610,6 +633,7 @@ pub mod container {
610633
while positions[0] < other1.len() && positions[1] < other2.len() && !self.at_capacity() {
611634
let (d1, t1, _) = &other1[positions[0]];
612635
let (d2, t2, _) = &other2[positions[1]];
636+
// NOTE: The .clone() calls here are not great, but this dead code to be removed in the next release.
613637
match (d1, t1).cmp(&(d2, t2)) {
614638
Ordering::Less => {
615639
self.push(other1[positions[0]].clone());
@@ -638,18 +662,22 @@ pub mod container {
638662

639663
fn extract(
640664
&mut self,
665+
position: &mut usize,
641666
upper: AntichainRef<T>,
642667
frontier: &mut Antichain<T>,
643668
keep: &mut Self,
644669
ship: &mut Self,
645670
) {
646-
for (data, time, diff) in self.drain(..) {
671+
let len = self.len();
672+
while *position < len && !keep.at_capacity() && !ship.at_capacity() {
673+
let (data, time, diff) = self[*position].clone();
647674
if upper.less_equal(&time) {
648675
frontier.insert_with(&time, |time| time.clone());
649676
keep.push((data, time, diff));
650677
} else {
651678
ship.push((data, time, diff));
652679
}
680+
*position += 1;
653681
}
654682
}
655683
}
@@ -747,18 +775,22 @@ pub mod container {
747775

748776
fn extract(
749777
&mut self,
778+
position: &mut usize,
750779
upper: AntichainRef<T>,
751780
frontier: &mut Antichain<T>,
752781
keep: &mut Self,
753782
ship: &mut Self,
754783
) {
755-
for (data, time, diff) in self.iter() {
784+
let len = self[..].len();
785+
while *position < len && !keep.at_capacity() && !ship.at_capacity() {
786+
let (data, time, diff) = &self[*position];
756787
if upper.less_equal(time) {
757788
frontier.insert_with(time, |time| time.clone());
758789
keep.copy_destructured(data, time, diff);
759790
} else {
760791
ship.copy_destructured(data, time, diff);
761792
}
793+
*position += 1;
762794
}
763795
}
764796
}

0 commit comments

Comments
 (0)