diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java index c40b91f708..37e2bf0f2f 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java @@ -47,6 +47,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import org.apache.ratis.util.CodeInjectionForTesting; /** @@ -357,6 +358,7 @@ void appendToOpenSegment(LogEntryProto entry, Op op) { append(true, entry, op); } + public static final String APPEND_RECORD = LogSegment.class.getSimpleName() + ".append"; private void append(boolean keepEntryInCache, LogEntryProto entry, Op op) { Objects.requireNonNull(entry, "entry == null"); final LogRecord currentLast = records.getLast(); @@ -371,7 +373,11 @@ private void append(boolean keepEntryInCache, LogEntryProto entry, Op op) { final LogRecord record = new LogRecord(totalFileSize, entry); if (keepEntryInCache) { + // It is important to put the entry into the cache before appending the + // record to the record list. Otherwise, a reader thread may get the + // record from the list but not the entry from the cache. putEntryCache(record.getTermIndex(), entry, op); + CodeInjectionForTesting.execute(APPEND_RECORD, this, record.getTermIndex()); } records.append(record); totalFileSize += getEntrySize(entry, op); diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java index a772b00029..3697c546a2 100644 --- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java +++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java @@ -67,6 +67,7 @@ import java.util.function.LongSupplier; import java.util.function.Supplier; import java.util.stream.Stream; +import java.util.concurrent.atomic.AtomicBoolean; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; @@ -873,4 +874,48 @@ public void testAsyncFlushPerf2(Boolean useAsyncFlush, Boolean smSyncFlush) thro " ns with asyncFlush " + useAsyncFlush); } } + + @Test + public void testStateBetweenCacheAndRecordAppend() throws Exception { + final AtomicReference testError = new AtomicReference<>(); + final CountDownLatch injectionHit = new CountDownLatch(1); + + final CodeInjectionForTesting.Code code = (localId, remoteId, args) -> { + final LogSegment segment = (LogSegment) localId; + final TermIndex ti = (TermIndex) remoteId; + try { + // entry is in the cache but the record is not in the record list. + Assertions.assertNotNull(segment.getEntryFromCache(ti)); + Assertions.assertNull(segment.getLogRecord(ti.getIndex())); + } catch (Throwable t) { + testError.set(t); + } finally { + injectionHit.countDown(); + } + return true; + }; + + try (SegmentedRaftLog raftLog = newSegmentedRaftLog()) { + CodeInjectionForTesting.put(LogSegment.APPEND_RECORD, code); + raftLog.open(RaftLog.INVALID_LOG_INDEX, null); + final LogEntryProto newEntry = prepareLogEntry(1, 0, () -> "newEntry", false); + + raftLog.appendEntry(newEntry).join(); + + Assertions.assertTrue(injectionHit.await(5, TimeUnit.SECONDS), "Injection point was not hit."); + + final Throwable t = testError.get(); + if (t != null) { + throw new Exception("Test failed", t); + } + + // after the append, both entry and record should be available. + final LogSegment segment = raftLog.getRaftLogCache().getOpenSegment(); + final TermIndex ti = segment.getLastTermIndex(); + Assertions.assertNotNull(segment.getEntryFromCache(ti)); + Assertions.assertNotNull(segment.getLogRecord(ti.getIndex())); + } finally { + CodeInjectionForTesting.remove(LogSegment.APPEND_RECORD); + } + } }