Skip to content

Commit b76107e

Browse files
committed
simplified (hopefully)
1 parent e69fcc6 commit b76107e

10 files changed

Lines changed: 191 additions & 207 deletions

File tree

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ thiserror = "1"
1717
tracing = "0.1.37"
1818

1919
[dev-dependencies]
20+
mock_instant = "0.3"
2021
criterion = "0.4"
2122
futures = "0.3"
2223
proptest = "1"

src/lib.rs

Lines changed: 3 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,40 +17,16 @@ pub use multi_record_log::{MultiRecordLog, SyncPolicy};
1717
#[derive(Debug)]
1818
pub struct Record<'a> {
1919
pub position: u64,
20-
payload: PagesBuf<'a>,
20+
pub payload: PagesBuf<'a>,
2121
}
2222

23-
2423
impl<'a> Record<'a> {
2524
#[cfg(test)]
26-
pub fn payload_equal(&self, mut payload: &[u8]) -> bool {
27-
use bytes::Buf;
28-
let mut self_payload = self.payload;
29-
if self_payload.remaining() != payload.len() {
30-
return false;
31-
}
32-
while self_payload.has_remaining() {
33-
let chunk = self_payload.chunk();
34-
let chunk_len = chunk.len();
35-
if chunk != &payload[..chunk_len] {
36-
return false;
37-
}
38-
self_payload.advance(chunk_len);
39-
payload = &payload[chunk_len..];
40-
}
41-
true
25+
pub fn payload_equal(&self, payload: &[u8]) -> bool {
26+
self.payload.to_cow() == payload
4227
}
4328
}
4429

45-
// impl<'a> Record<'a> {
46-
// pub fn new(position: u64, payload: &'a [u8]) -> Self {
47-
// Record {
48-
// position,
49-
// payload: Cow::Borrowed(payload),
50-
// }
51-
// }
52-
// }
53-
5430
#[derive(Clone, Default, Debug, Ord, PartialOrd, Eq, PartialEq)]
5531
pub struct FileNumber {
5632
file_number: Arc<u64>,

src/mem/arena.rs

Lines changed: 53 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
1-
use std::time::{Duration, Instant};
1+
use std::time::Duration;
2+
#[cfg(not(test))]
3+
use std::time::Instant;
4+
5+
#[cfg(test)]
6+
use mock_instant::Instant;
27

38
#[cfg(not(test))]
49
pub const PAGE_SIZE: usize = 1 << 20;
@@ -60,7 +65,7 @@ impl Default for ArenaStats {
6065
fn default() -> ArenaStats {
6166
ArenaStats {
6267
// We arbitrarily initialize num used pages former to 100.
63-
max_num_used_pages_former: 100,
68+
max_num_used_pages_former: 0,
6469
max_num_used_pages_current: 0,
6570
call_counter: 0u8,
6671
next_window_start: Instant::now(),
@@ -76,35 +81,27 @@ impl ArenaStats {
7681
self.next_window_start = now + WINDOW;
7782
}
7883

84+
/// Records the number of used pages, and returns an estimation of the maximum number of pages
85+
/// in the last 5 minutes.
7986
pub fn record_num_used_page(&mut self, num_used_pages: usize) -> usize {
8087
// The only function of the call counter is to avoid calling `Instant::now()`
8188
// at every single call.
82-
self.call_counter = self.call_counter.wrapping_add(1);
89+
self.call_counter = (self.call_counter + 1) % 64;
8390
if self.call_counter == 0u8 {
8491
let now = Instant::now();
8592
if now > self.next_window_start {
8693
self.roll(now);
8794
}
8895
}
8996
self.max_num_used_pages_current = self.max_num_used_pages_current.max(num_used_pages);
90-
self.target_num_pages()
91-
}
92-
93-
// This method returns a target number of pages.
94-
//
95-
// If we currently have a number of allocated pages higher than this, we need to free
96-
// pages until we reach this number.
97-
fn target_num_pages(&self) -> usize {
98-
let max_over_both_windows = self
99-
.max_num_used_pages_former
100-
.max(self.max_num_used_pages_current);
101-
(max_over_both_windows + 10).max(max_over_both_windows * 105 / 100)
97+
self.max_num_used_pages_former
98+
.max(self.max_num_used_pages_current)
10299
}
103100
}
104101

105102
impl Arena {
106103
/// Returns an allocated page id.
107-
pub fn get_page_id(&mut self) -> PageId {
104+
pub fn acquire_page(&mut self) -> PageId {
108105
if let Some(page_id) = self.free_page_ids.pop() {
109106
assert!(self.pages[page_id.0].is_some());
110107
return page_id;
@@ -141,7 +138,11 @@ impl Arena {
141138
/// `gc` releases memory by deallocating ALL of the free pages.
142139
pub fn gc(&mut self) {
143140
let num_used_pages = self.num_used_pages();
144-
let target_num_pages = self.stats.record_num_used_page(num_used_pages);
141+
let max_used_num_pages_in_last_5_min = self.stats.record_num_used_page(num_used_pages);
142+
// We pick a target slightly higher than the maximum number of pages used in the last 5
143+
// minutes to avoid needless allocations when we are experience a general increase
144+
// in memory usage.
145+
let target_num_pages = (max_used_num_pages_in_last_5_min * 105 / 100).max(10);
145146
let num_pages_to_free = self.num_allocated_pages().saturating_sub(target_num_pages);
146147
assert!(num_pages_to_free <= self.free_page_ids.len());
147148
for _ in 0..num_pages_to_free {
@@ -162,40 +163,63 @@ impl Arena {
162163
self.pages.len() - self.free_slots.len() - self.free_page_ids.len()
163164
}
164165

165-
pub fn capacity(&self) -> usize {
166-
self.num_allocated_pages() * PAGE_SIZE
167-
}
168-
169166
pub fn unused_capacity(&self) -> usize {
170167
self.free_page_ids.len() * PAGE_SIZE
171168
}
172169
}
173170

174171
#[cfg(test)]
175172
mod tests {
173+
use mock_instant::MockClock;
174+
176175
use super::*;
177176

178177
#[test]
179178
fn test_arena_simple() {
180179
let mut arena = Arena::default();
181-
assert_eq!(arena.capacity(), 0);
182-
assert_eq!(arena.get_page_id(), PageId(0));
183-
assert_eq!(arena.get_page_id(), PageId(1));
180+
assert_eq!(arena.num_allocated_pages(), 0);
181+
assert_eq!(arena.acquire_page(), PageId(0));
182+
assert_eq!(arena.acquire_page(), PageId(1));
184183
arena.release_page(PageId(0));
185-
assert_eq!(arena.get_page_id(), PageId(0));
184+
assert_eq!(arena.acquire_page(), PageId(0));
186185
}
187186

188187
#[test]
189188
fn test_arena_gc() {
190189
let mut arena = Arena::default();
191-
assert_eq!(arena.capacity(), 0);
192-
assert_eq!(arena.get_page_id(), PageId(0));
193-
assert_eq!(arena.get_page_id(), PageId(1));
190+
assert_eq!(arena.num_allocated_pages(), 0);
191+
assert_eq!(arena.acquire_page(), PageId(0));
192+
assert_eq!(arena.acquire_page(), PageId(1));
194193
arena.release_page(PageId(1));
195194
assert_eq!(arena.num_allocated_pages(), 2);
196195
arena.gc();
197196
assert_eq!(arena.num_allocated_pages(), 2);
198-
assert_eq!(arena.get_page_id(), PageId(1));
197+
assert_eq!(arena.acquire_page(), PageId(1));
199198
assert_eq!(arena.num_allocated_pages(), 2);
200199
}
200+
201+
#[test]
202+
fn test_arena_stats() {
203+
let mut arena_stats = ArenaStats::default();
204+
for _ in 0..256 {
205+
assert_eq!(arena_stats.record_num_used_page(10), 10);
206+
}
207+
MockClock::advance(WINDOW.mul_f32(1.1f32));
208+
for _ in 0..256 {
209+
assert_eq!(arena_stats.record_num_used_page(1), 10);
210+
}
211+
MockClock::advance(WINDOW.mul_f32(1.1f32));
212+
for _ in 0..256 {
213+
arena_stats.record_num_used_page(1);
214+
}
215+
assert_eq!(arena_stats.record_num_used_page(1), 1);
216+
assert_eq!(arena_stats.record_num_used_page(2), 2);
217+
for _ in 0..256 {
218+
assert_eq!(arena_stats.record_num_used_page(1), 2);
219+
}
220+
MockClock::advance(WINDOW);
221+
for _ in 0..256 {
222+
assert_eq!(arena_stats.record_num_used_page(1), 2);
223+
}
224+
}
201225
}

src/mem/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@ mod queue;
33
mod queues;
44
mod rolling_buffer;
55

6-
use self::arena::{Arena, PAGE_SIZE};
6+
use self::arena::Arena;
77
pub(crate) use self::queue::MemQueue;
88
pub(crate) use self::queues::MemQueues;
9-
use self::rolling_buffer::RollingBuffer;
109
pub use self::rolling_buffer::PagesBuf;
10+
use self::rolling_buffer::RollingBuffer;
1111

1212
#[cfg(test)]
1313
mod tests;

src/mem/queue.rs

Lines changed: 14 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
use std::collections::VecDeque;
12
use std::ops::{Bound, RangeBounds};
3+
24
use crate::error::AppendError;
35
use crate::mem::{Arena, RollingBuffer};
46
use crate::{FileNumber, Record};
@@ -16,30 +18,17 @@ struct RecordMeta {
1618
pub(crate) struct MemQueue {
1719
// Concatenated records
1820
concatenated_records: RollingBuffer,
21+
// If `record_metas` is not empty, `start_position` should be the position of the first record.
1922
start_position: u64,
20-
record_metas: Vec<RecordMeta>,
23+
record_metas: VecDeque<RecordMeta>,
2124
}
2225

23-
// fn concatenate_buffers<'a>(mut buf: impl Buf + 'a) -> Cow<'a, [u8]> {
24-
// let first_chunk: &'a buf = buf.chunk();
25-
// if buf.remaining() == first_chunk.len() {
26-
// return Cow::Borrowed(first_chunk);
27-
// }
28-
// let mut concatenated_buffer: Vec<u8> = Vec::with_capacity(buf.remaining());
29-
// while buf.has_remaining() {
30-
// let chunk = buf.chunk();
31-
// concatenated_buffer.extend_from_slice(chunk);
32-
// buf.advance(chunk.len());
33-
// }
34-
// Cow::Owned(concatenated_buffer)
35-
// }
36-
3726
impl MemQueue {
3827
pub fn with_next_position(next_position: u64) -> Self {
3928
MemQueue {
4029
concatenated_records: RollingBuffer::new(),
4130
start_position: next_position,
42-
record_metas: Vec::new(),
31+
record_metas: Default::default(),
4332
}
4433
}
4534

@@ -54,10 +43,10 @@ impl MemQueue {
5443

5544
/// Returns the last record stored in the queue.
5645
pub fn last_record<'a>(&'a self, arena: &'a Arena) -> Option<Record<'a>> {
57-
let record = self.record_metas.last()?;
46+
let record = self.record_metas.back()?;
5847
let record_payload = self
5948
.concatenated_records
60-
.get_range_buf(record.start_offset.., arena);
49+
.get_range(record.start_offset.., arena);
6150
Some(Record {
6251
position: record.position,
6352
payload: record_payload,
@@ -67,7 +56,7 @@ impl MemQueue {
6756
/// Returns what the next position should be.
6857
pub fn next_position(&self) -> u64 {
6958
self.record_metas
70-
.last()
59+
.back()
7160
.map(|record| record.position + 1)
7261
.unwrap_or(self.start_position)
7362
}
@@ -92,7 +81,7 @@ impl MemQueue {
9281
self.start_position = target_position;
9382
}
9483

95-
let file_number = if let Some(record_meta) = self.record_metas.last_mut() {
84+
let file_number = if let Some(record_meta) = self.record_metas.back_mut() {
9685
if record_meta.file_number.as_ref() == Some(file_number) {
9786
record_meta.file_number.take().unwrap()
9887
} else {
@@ -103,11 +92,11 @@ impl MemQueue {
10392
};
10493

10594
let record_meta = RecordMeta {
106-
start_offset: self.concatenated_records.len(),
95+
start_offset: self.concatenated_records.end_offset(),
10796
file_number: Some(file_number),
10897
position: target_position,
10998
};
110-
self.record_metas.push(record_meta);
99+
self.record_metas.push_back(record_meta);
111100
self.concatenated_records.extend_from_slice(payload, arena);
112101
Ok(())
113102
}
@@ -149,9 +138,9 @@ impl MemQueue {
149138
} else {
150139
Bound::Unbounded
151140
};
152-
let payload= self
141+
let payload = self
153142
.concatenated_records
154-
.get_range_buf((start_bound, end_bound), arena);
143+
.get_range((start_bound, end_bound), arena);
155144
// let payload = concatenate_buffers(payload_buf);
156145
Record { position, payload }
157146
})
@@ -178,11 +167,8 @@ impl MemQueue {
178167

179168
let start_offset_to_keep: usize = self.record_metas[first_record_to_keep].start_offset;
180169
self.record_metas.drain(..first_record_to_keep);
181-
// for record_meta in &mut self.record_metas {
182-
// record_meta.start_offset -= start_offset_to_keep;
183-
// }
184170
self.concatenated_records
185-
.truncate_up_to_included(start_offset_to_keep, arena);
171+
.truncate_up_to_excluded(start_offset_to_keep, arena);
186172
self.start_position = truncate_up_to_pos + 1;
187173
first_record_to_keep
188174
}

src/mem/queues.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -165,10 +165,7 @@ impl MemQueues {
165165
let size = self
166166
.queues
167167
.iter()
168-
.map(|(name, queue)| {
169-
dbg!(queue.size());
170-
name.len() + queue.size()
171-
})
168+
.map(|(name, queue)| name.len() + queue.size())
172169
.sum();
173170

174171
let capacity = self

0 commit comments

Comments
 (0)