Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 18 additions & 9 deletions crates/commitlog/src/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,30 +342,39 @@ impl StoredCommit {
}
}

/// Numbers needed to compute [`crate::segment::Header`].
/// A [`StoredCommit`] sans the records payload.
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct Metadata {
pub tx_range: Range<u64>,
pub size_in_bytes: u64,
pub epoch: u64,
pub checksum: u32,
}

impl Metadata {
/// Extract the [`Metadata`] of a single [`Commit`] from the given reader.
/// Extract the [`Metadata`] of a single [`StoredCommit`] from the given
/// reader.
///
/// Note that this decodes the commit due to checksum verification.
/// Like [`Commit::decode`], returns `None` if the reader is at EOF already.
/// Like [`StoredCommit::decode`], this method returns `None` if the reader
/// is at EOF already.
pub fn extract<R: io::Read>(reader: R) -> io::Result<Option<Self>> {
Commit::decode(reader).map(|maybe_commit| maybe_commit.map(Self::from))
StoredCommit::decode(reader).map(|maybe_commit| maybe_commit.map(Self::from))
}
}

impl From<Commit> for Metadata {
fn from(commit: Commit) -> Self {
impl From<StoredCommit> for Metadata {
fn from(commit: StoredCommit) -> Self {
let tx_range = commit.tx_range();
let epoch = commit.epoch;
let checksum = commit.checksum;
let size_in_bytes = Commit::from(commit).encoded_len() as u64;

Self {
tx_range: commit.tx_range(),
size_in_bytes: commit.encoded_len() as u64,
epoch: commit.epoch,
tx_range,
size_in_bytes,
epoch,
checksum,
}
}
}
Expand Down
151 changes: 140 additions & 11 deletions crates/commitlog/src/commitlog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{
error::{self, source_chain},
index::IndexError,
payload::Decoder,
repo::{self, Repo, TxOffsetIndex},
repo::{self, Repo, SegmentLen as _, TxOffsetIndex},
segment::{self, FileLike, Transaction, Writer},
Commit, Encode, Options, DEFAULT_LOG_FORMAT_VERSION,
};
Expand Down Expand Up @@ -355,7 +355,142 @@ impl<R: Repo, T> Drop for Generic<R, T> {
}
}

/// Extract the most recently written [`segment::Metadata`] from the commitlog
/// The most recent non empty segment in repo `R`.
///
/// Created by [open_newest_non_empty_segment].
struct MostRecentNonEmptySegment<R> {
/// Number of empty segments that were ignored.
empty_segments: usize,
/// Offset of the non-empty segment.
segment_offset: u64,
/// [Repo::SegmentReader] for the non-empty segment.
segment_reader: R,
}

/// Open the most recent segment in `repo` that is larger than
/// [segment::Header::LEN].
///
/// Note that there should be at most one empty segment in the log. We may,
/// however, want to be lenient on this read-only path, so the number of
/// empty segments is tracked in the returned type rather than returning an
/// error.
fn open_newest_non_empty_segment<R: Repo>(repo: R) -> io::Result<Option<MostRecentNonEmptySegment<R::SegmentReader>>> {
let mut segments = repo.existing_offsets()?;

let mut empty_segments = 0;
let mut segment_offset;
let mut segment_reader;
loop {
let Some(last) = segments.pop() else {
return Ok(None);
};
segment_offset = last;
segment_reader = repo.open_segment_reader(segment_offset)?;
if segment_reader.segment_len()? > segment::Header::LEN as u64 {
break;
} else {
empty_segments += 1;
}
}

Ok(Some(MostRecentNonEmptySegment {
empty_segments,
segment_offset,
segment_reader,
}))
}

/// The most recently written [segment::Metadata] for a given [Repo].
///
/// The type preserves the error information in case the most recent segment
/// contains corrupted data at the end (typically due to a torn write).
///
/// Created by [committed_meta].
pub enum CommittedMeta {
/// The most recent segment could not be traversed successfully until the
/// end, i.e. there is trailing garbage in the segment.
///
/// This variant is also returned in case [open_newest_non_empty_segment]
/// finds more than a single empty segment at the end of the log.
Prefix {
/// The metadata of the prefix that could be traversed successfully.
///
/// It is guaranteed that the metadata spans at least one commit.
metadata: segment::Metadata,
/// The error encountered.
error: io::Error,
},
/// The most recent segment could be traversed successfully until the end.
Complete {
/// The segment metadata.
///
/// It is guaranteed that the metadata spans at least one commit.
metadata: segment::Metadata,
},
}

impl CommittedMeta {
pub fn metadata(&self) -> &segment::Metadata {
let (Self::Prefix { metadata, .. } | Self::Complete { metadata }) = self;
metadata
}

fn extract(repo: impl Repo) -> io::Result<Option<Self>> {
let Some(MostRecentNonEmptySegment {
empty_segments,
segment_offset,
mut segment_reader,
}) = open_newest_non_empty_segment(&repo)?
else {
return Ok(None);
};
let offset_index = repo.get_offset_index(segment_offset).ok();
match segment::Metadata::extract(segment_offset, &mut segment_reader, offset_index.as_ref()) {
// Segment is intact.
Ok(metadata) if empty_segments <= 1 => {
assert!(
!metadata.tx_range.is_empty(),
"segment was promised to be non-empty but contains zero transactions"
);
Ok(Some(CommittedMeta::Complete { metadata }))
}
// Segment is good, but there are too many empty segments.
Ok(metadata) => Ok(Some(CommittedMeta::Prefix {
metadata,
error: io::Error::new(
io::ErrorKind::InvalidData,
format!("repo {}: too many empty segments: {}", repo, empty_segments),
),
})),
// Segment is non-empty, but first commit is corrupt.
Err(error::SegmentMetadata::InvalidCommit { sofar, source }) if sofar.tx_range.is_empty() => {
Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"repo {}: first commit in the most recent segment is corrupt: {}",
repo, source
),
))
}
// Some prefix of the segment is good.
Err(error::SegmentMetadata::InvalidCommit { sofar, source }) => Ok(Some(CommittedMeta::Prefix {
metadata: sofar,
error: source,
})),
// Something went wrong, including out-of-order errors and such.
Err(error::SegmentMetadata::Io(e)) => Err(e),
}
}
}

impl From<CommittedMeta> for segment::Metadata {
fn from(meta: CommittedMeta) -> Self {
let (CommittedMeta::Prefix { metadata, .. } | CommittedMeta::Complete { metadata }) = meta;
metadata
}
}

/// Extract the most recently written [CommittedMeta] from the commitlog
/// in `repo`.
///
/// Returns `None` if the commitlog is empty.
Expand All @@ -373,18 +508,12 @@ impl<R: Repo, T> Drop for Generic<R, T> {
/// like so:
///
/// ```ignore
/// let max_offset = committed_meta(..)?.map(|meta| meta.tx_range.end);
/// let max_offset = committed_meta(..)?.map(|meta| meta.metadata().tx_range.end);
/// ```
///
/// Unlike `open`, no segment will be created in an empty `repo`.
pub fn committed_meta(repo: impl Repo) -> Result<Option<segment::Metadata>, error::SegmentMetadata> {
let Some(last) = repo.existing_offsets()?.pop() else {
return Ok(None);
};

let mut storage = repo.open_segment_reader(last)?;
let offset_index = repo.get_offset_index(last).ok();
segment::Metadata::extract(last, &mut storage, offset_index.as_ref()).map(Some)
pub fn committed_meta(repo: impl Repo) -> io::Result<Option<CommittedMeta>> {
CommittedMeta::extract(repo)
}

pub fn commits_from<R: Repo>(repo: R, max_log_format_version: u8, offset: u64) -> io::Result<Commits<R>> {
Expand Down
3 changes: 2 additions & 1 deletion crates/commitlog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod varint;

pub use crate::{
commit::{Commit, StoredCommit},
commitlog::CommittedMeta,
payload::{Decoder, Encode},
repo::fs::SizeOnDisk,
segment::{Transaction, DEFAULT_LOG_FORMAT_VERSION},
Expand Down Expand Up @@ -558,7 +559,7 @@ impl<T: Encode> Commitlog<T> {
/// ```
///
/// Unlike `open`, no segment will be created in an empty `repo`.
pub fn committed_meta(root: CommitLogDir) -> Result<Option<segment::Metadata>, error::SegmentMetadata> {
pub fn committed_meta(root: CommitLogDir) -> io::Result<Option<CommittedMeta>> {
commitlog::committed_meta(repo::Fs::new(root, None)?)
}

Expand Down
1 change: 1 addition & 0 deletions crates/commitlog/src/repo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ pub fn resume_segment_writer<R: Repo>(
size_in_bytes,
max_epoch,
max_commit_offset: _,
max_commit: _,
} = match Metadata::extract(offset, &mut storage, offset_index.as_ref()) {
Err(error::SegmentMetadata::InvalidCommit { sofar, source }) => {
warn!("invalid commit in segment {offset}: {source}");
Expand Down
37 changes: 28 additions & 9 deletions crates/commitlog/src/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,7 @@ pub struct Metadata {
/// `max_commit_offset..tx_range.end` is the range of
/// transactions contained in it.
pub max_commit_offset: u64,
pub max_commit: Option<commit::Metadata>,
}

impl Metadata {
Expand Down Expand Up @@ -627,6 +628,7 @@ impl Metadata {
size_in_bytes: Header::LEN as u64,
max_epoch: u64::default(),
max_commit_offset: min_tx_offset,
max_commit: None,
});

reader.seek(SeekFrom::Start(sofar.size_in_bytes))?;
Expand Down Expand Up @@ -663,6 +665,7 @@ impl Metadata {
// TODO: Should it be an error to encounter an epoch going backwards?
sofar.max_epoch = commit.epoch.max(sofar.max_epoch);
sofar.max_commit_offset = commit.tx_range.start;
sofar.max_commit = Some(commit);
}

Ok(sofar)
Expand Down Expand Up @@ -695,6 +698,7 @@ impl Metadata {
size_in_bytes: byte_offset + commit.size_in_bytes,
max_epoch: commit.epoch,
max_commit_offset: commit.tx_range.start,
max_commit: Some(commit),
});
}

Expand Down Expand Up @@ -833,18 +837,33 @@ mod tests {
writer.commit().unwrap();

let reader = repo::open_segment_reader(&repo, DEFAULT_LOG_FORMAT_VERSION, 0).unwrap();
let metadata = reader.metadata().unwrap();
let Metadata {
header,
tx_range,
size_in_bytes,
max_epoch,
max_commit_offset,
max_commit,
} = reader.metadata().unwrap();

assert_eq!(
metadata,
Metadata {
header: Header::default(),
tx_range: Range { start: 0, end: 5 },
(
header,
tx_range,
size_in_bytes,
max_epoch,
max_commit_offset,
max_commit.is_some_and(|meta| meta.tx_range == (3..5))
),
(
Header::default(),
0..5,
// header + 5 txs + 3 commits
size_in_bytes: (Header::LEN + (5 * 32) + (3 * Commit::FRAMING_LEN)) as u64,
max_epoch: Commit::DEFAULT_EPOCH,
max_commit_offset: 3
}
(Header::LEN + (5 * 32) + (3 * Commit::FRAMING_LEN)) as u64,
Commit::DEFAULT_EPOCH,
3,
true
)
);
}

Expand Down
Loading