Skip to content

Commit 79b0e35

Browse files
committed
Use direct I/O for commitlog compression
1 parent 1cba303 commit 79b0e35

2 files changed

Lines changed: 17 additions & 8 deletions

File tree

  • crates/commitlog

crates/commitlog/src/repo/fs.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use std::sync::Arc;
55

66
use log::{debug, warn};
77
use spacetimedb_fs_utils::compression::{compress_with_zstd, CompressReader};
8+
use spacetimedb_fs_utils::direct_io;
89
use spacetimedb_paths::server::{CommitLogDir, SegmentFile};
910
use tempfile::NamedTempFile;
1011

@@ -29,7 +30,7 @@ pub type OnNewSegmentFn = dyn Fn() + Send + Sync + 'static;
2930
/// Size on disk of a [Fs] repo.
3031
///
3132
/// Created by [Fs::size_on_disk].
32-
#[derive(Clone, Copy, Default)]
33+
#[derive(Clone, Copy, Debug, Default)]
3334
pub struct SizeOnDisk {
3435
/// The total size in bytes of all segments and offset indexes in the repo.
3536
pub total_bytes: u64,
@@ -156,7 +157,7 @@ impl FileLike for NamedTempFile {
156157

157158
impl Repo for Fs {
158159
type SegmentWriter = File;
159-
type SegmentReader = CompressReader;
160+
type SegmentReader = CompressReader<File>;
160161

161162
fn create_segment(&self, offset: u64) -> io::Result<Self::SegmentWriter> {
162163
File::options()
@@ -213,18 +214,21 @@ impl Repo for Fs {
213214
}
214215

215216
fn compress_segment(&self, offset: u64) -> io::Result<()> {
216-
let src = self.open_segment_reader(offset)?;
217+
let segment_path = self.segment_path(offset);
218+
let src = direct_io::file_reader(&segment_path).and_then(CompressReader::new)?;
217219
// if it's already compressed, leave it be
218220
let CompressReader::None(mut src) = src else {
219221
return Ok(());
220222
};
221223

222-
let mut dst = NamedTempFile::new_in(&self.root)?;
224+
let tmp = NamedTempFile::new_in(&self.root)?.into_temp_path();
225+
let mut dst = direct_io::file_writer(&tmp)?;
223226
// bytes per frame. in the future, it might be worth looking into putting
224227
// every commit into its own frame, to make seeking more efficient.
225228
let max_frame_size = 0x1000;
226229
compress_with_zstd(&mut src, &mut dst, Some(max_frame_size))?;
227-
dst.persist(self.segment_path(offset))?;
230+
dst.get_ref().sync_all()?;
231+
tmp.persist(segment_path)?;
228232

229233
Ok(())
230234
}
@@ -266,7 +270,7 @@ impl Repo for Fs {
266270
}
267271
}
268272

269-
impl SegmentLen for CompressReader {}
273+
impl<R: io::Read + io::Seek> SegmentLen for CompressReader<R> {}
270274

271275
#[cfg(feature = "streaming")]
272276
impl crate::stream::AsyncRepo for Fs {

crates/commitlog/tests/random_payload/mod.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ fn compression() {
8787
let clog = Commitlog::open(
8888
CommitLogDir::from_path_unchecked(root.path()),
8989
Options {
90-
max_segment_size: 8 * 1024,
90+
max_segment_size: 16 * 1024,
9191
max_records_in_commit: NonZeroU16::MIN,
9292
..Options::default()
9393
},
@@ -111,7 +111,12 @@ fn compression() {
111111
clog.compress_segments(segments_to_compress).unwrap();
112112

113113
let compressed_size = clog.size_on_disk().unwrap();
114-
assert!(compressed_size.total_bytes < uncompressed_size.total_bytes);
114+
assert!(
115+
compressed_size.total_bytes < uncompressed_size.total_bytes,
116+
"expected total size to be smaller after compression: uncompressed={:?} compressed={:?}",
117+
uncompressed_size,
118+
compressed_size
119+
);
115120

116121
assert!(clog
117122
.transactions(&payload::ArrayDecoder)

0 commit comments

Comments
 (0)