Skip to content

feature: Compression for memfile/rootfs assets#2034

Open
levb wants to merge 162 commits intomainfrom
lev-compression-final
Open

feature: Compression for memfile/rootfs assets#2034
levb wants to merge 162 commits intomainfrom
lev-compression-final

Conversation

@levb
Copy link
Copy Markdown
Contributor

@levb levb commented Mar 2, 2026

Summary

Compression for data files (memfile, rootfs). Files are broken into independently decompressible frames (2 MiB, zstd), stored in GCS alongside V4 headers with per-mapping frame tables. Fully backward-compatible: the read path auto-detects V3/V4 headers and routes compressed vs uncompressed reads per-mapping. Gated by compress-config LaunchDarkly flag (per-team/cluster/template targeting).

What changed

  • FramedFile interface replaces Seekable — unified GetFrame(ctx, offset, frameTable, decompress, buf, readSize, onRead) handles both compressed and uncompressed data
  • V4 header with FrameTable per mapping + BuildFileInfo (uncompressed size, SHA-256 checksum) per build; LZ4-block-compressed header blob
  • NFS cache extended for compressed frames (.frm files keyed by compressed offset+size); progressive streaming decompression on cache miss; write-through on upload
  • P2P resume integration — peers read uncompressed from origin during upload, then atomically swap to V4 header (CAS) when origin signals use_storage with serialized headers
  • compress-build CLI for background compression of existing uncompressed builds (supports --recursive for dependency chains)
  • New Chunker with mmap cache, and fetch sessions dedupe replacing streaming_chunk.go

Read path

  NBD/UFFD/Prefetch
    → header.GetShiftedMapping(offset) → BuildMap + FrameTable
    → DiffStore.Get(ctx, diff)         → cached Chunker
    → Chunker.GetBlock(offset, len, ft)
        → mmap hit? return reference
        → miss: fetchSession (dedup) → GetFrame
            → NFS hit? decompress from disk → mmap
            → NFS miss? GCS range read → decompress → mmap + NFS write-back

P2P header switchover

  Origin (pause):
    snapshot → register buildID in Redis → serve mmap cache via gRPC
    background: upload compressed data + V4 headers to GCS
    on completion: uploadedBuilds.Set(buildID, serialized V4 headers)
                → peerRegistry.Unregister(buildID)

  Peer (resume, upload in progress):
    GetFrame(ft=nil) → gRPC stream → origin serves from mmap (uncompressed)

  Peer (origin signals use_storage):
    checkPeerAvailability() → transitionHeaders.Store({memH, rootH})
                            → uploaded.Store(true)
    next GetFrame(ft=nil): ft==nil + transitionHeaders != nil
      → return PeerTransitionedError{headers}
      → build.File.swapHeader(): Deserialize(bytes) → CompareAndSwap(old, new)
        first goroutine wins CAS; others see swapped header on retry
      → retry: GetFrame(ft!=nil) → NFS/GCS compressed (mmap mostly warm)

Benchmark results

End-to-end pause/resume

(BenchmarkBaseImage, 50 iterations, local disk):

  ┌──────────────┬─────────┬────────────┐
  │     Mode     │ Latency │ Build time │
  ├──────────────┼─────────┼────────────┤
  │ Uncompressed │ 97 ms   │ 61.0s      │
  ├──────────────┼─────────┼────────────┤
  │ LZ4:0        │ 100 ms  │ 61.4s      │
  ├──────────────┼─────────┼────────────┤
  │ Zstd:1       │ 100 ms  │ 60.9s      │
  ├──────────────┼─────────┼────────────┤
  │ Zstd:2       │ 102 ms  │ 62.4s      │
  ├──────────────┼─────────┼────────────┤
  │ Zstd:3       │ 98 ms   │ 61.7s      │
  └──────────────┴─────────┴────────────┘

Full architecture doc: docs/compression-architecture.md

levb and others added 18 commits February 27, 2026 05:52
…ning

- Use header.HugepageSize for uncompressed fetch alignment (semantically correct)
- Stream NFS cache hits directly into ReadFrame instead of buffering in memory
- Fix timer placement to cover full GetFrame (read + decompression)
- Fix onRead callback: nil for compressed inner calls (prevents double-invoke),
  pass through for uncompressed (bytes are final)
- Remove panic recovery from runFetch (never in main)
- Remove low-value chunker tests subsumed by ConcurrentStress
- Remove 4MB frame configs from benchmarks (targeting 2MB only)
- Remove unused readCacheFile function

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ble NFS cache

- Remove dead flagsClient chain through chunker/build/template layers (~15 files)
- Delete ChunkerConfigFlag (unused after flagsClient removal)
- Delete mock_flagsclient_test.go
- Simplify GetUploadOptions: remove redundant intOr/strOr fallbacks (flags have defaults)
- Add GetCompressionType helper to frame_table.go, deduplicate compression type extraction
- Replace [16]byte{} with uuid.Nil and "rootfs.ext4" with storage.RootfsName in inspect-build
- Simplify UploadV4Header return pattern
- Remove onRead callback from legacy fullFetchChunker (FullFetch should not use progressive reads)
- Re-enable NFS cache in template cache.go
- Remove all fmt.Printf debug instrumentation from orchestrator

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…sionType threading

Add per-build file size and SHA-256 checksum to V4 headers, eliminating
the redundant Size() network call when opening upstream data files on
the read path. Checksums are computed for free by piggybacking on
CompressStream's existing frame iteration.

Remove the separate compressionType parameter threaded through
getBuild → newStorageDiff → NewChunker; the read path now derives
compression state from the per-mapping FrameTable directly.

V4 binary format change (not yet deployed):
  [Metadata] [LZ4: numBuilds, builds(uuid+size+checksum),
              numMappings, mappings...]

V3 path unchanged — falls back to Size() call when size is unknown.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
levb and others added 6 commits March 2, 2026 10:58
- Merge writeFrameToCache and writeChunkToCache into unified writeToCache
  with lock + atomic rename, used by all three cache write paths
- Fix file descriptor leak in cache hit paths: defer f.Close() and wrap
  in NopCloser so ReadFrame's close doesn't double-close the fd
- Add defer uploader.Close() in CompressStream so PartUploader file
  handles are released on error paths between Start() and Complete()
- Make Close() idempotent via sync.Once on fsPartUploader and filePartWriter

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
levb and others added 2 commits March 3, 2026 06:09
The SHA-256 checksum in BuildFileInfo now covers uncompressed data,
making it useful for end-to-end integrity verification of the original
content. Updated inspect-build to use SHA-256 (replacing MD5) and
verify checksums against the header. Fixed early-return lint warnings.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
GetUploadOptions now accepts fileType and useCase parameters, enriching
the LD evaluation context so dashboard targeting rules can differentiate
(e.g. compress memfile but not rootfs, or builds but not pauses).
TemplateBuild accepts per-file opts directly instead of holding an ff
reference.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@levb levb changed the base branch from main to lev-paths-refactor April 3, 2026 06:33
github-actions bot and others added 16 commits April 3, 2026 06:36
…leanly

Close both the decompressor and raw stream before deciding whether to
write back to NFS cache. Previously a partial-read scenario could cache
incomplete compressed data if the decompressor failed to drain.

Also eliminates an unnecessary buffer copy by transferring ownership of
the backing array to the background writeback goroutine.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Replace SetFrames (always scans from frame 0) with SetFramesFrom in all
mapping split paths. In the "diff inside base" case, the left split
returns a cursor that the right split continues from, avoiding O(N²)
rescanning of the base FrameTable.

Add table-driven tests covering FrameTable propagation through all split
paths, including multi-layer headers with three compressed builds. Each
test case also verifies the read-path invariant: FrameFor must resolve
both the first and last byte of every mapping's storage range.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Remove unused methods, functions, and constants identified during
code review. Rename SetFramesFrom→SetFrames and SubsetFrom→Subset
now that their shorter-named wrappers are gone.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…nfig logging

The integration_tests.yml workflow referenced inputs.compression which
was never defined, so artifact name prefixes never activated. Removed
since there's a single test run (always LZ4).

Added compression config logging to build and pause upload paths so
the active config is visible in logs.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- CompressionType OTEL attribute used string(byte) producing "\x01"
  instead of "zstd"; use .String() method
- FS backend leaked raw reader when newDecompressingReadCloser failed;
  add raw.Close() on error (GCS already handled this correctly)
- Write timer leaked on compressed upload path; record Success/Failure
  after storeFileCompressed returns

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…d cache metrics

- Restore cache.go to match main (identical to e2b/main)
- Use sliceDirect after fetch to avoid TOCTOU with coarse dirty granularity
- Defer setIsCached to runFetch after full chunk is written
- Restore CI workflow integration_tests.yml formatting (keep compression settings)
- Minimize storage_cache_metrics.go diff vs base

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…hecks

compressStream: on read-loop error or context cancellation, wait for
emit and upload goroutines to finish before returning. Previously
uploader.Close() could race with in-flight UploadPart calls.

Remove defensive nil-FrameTable checks in compressed uploader —
compressStream guarantees non-nil FrameTable on success.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Base automatically changed from lev-paths-refactor to main April 6, 2026 20:59
levb and others added 2 commits April 6, 2026 14:12
…retry test

fetchSession: replace the close-and-recreate channel broadcast pattern
with sync.Cond. Eliminates O(advance_calls) channel allocations per
fetch — advance() is now zero-alloc.

Widen timing bounds in TestRetryableClient_ActualRetryBehavior to
prevent flakes on slow CI runners (pre-existing, not our test).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
levb and others added 6 commits April 6, 2026 16:34
Replace unbounded retry loop in File.ReadAt/Slice with maxTransitionRetries=2
counter. Remove redundant swapFailed atomic — the retry counter bounds all
failure modes (corrupt headers fail immediately, successful-but-unhelpful
swaps are capped).

Replace `type PendingBuildInfo sync.Map` (unidiomatic type alias requiring
(*sync.Map)(p) casts) with a struct wrapping sync.Mutex + plain map. The
entry count is tiny (2-8 keys) and doesn't benefit from sync.Map's sharding.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Replace manual strconv buffer composition in makeFrameFilename with
  fmt.Sprintf — simpler and equivalent.
- Restore retry test to main's original 200ms/300ms bounds; the widened
  2s timeout was unnecessary since the PR doesn't change retry logic.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Send a single advance signal instead of closing the channel, so the
reader can only complete one 16KB step. This guarantees the late offset
is unreachable when we check — no dependency on goroutine scheduling.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…lose errors

decompressingCacheReader.Close() now returns an error when the captured
compressed bytes don't match the expected frame size, instead of silently
skipping the NFS cache writeback. This surfaces frame table / header bugs
that would otherwise manifest as persistent GCS round-trips.

progressiveRead uses a named return + defer to propagate reader.Close()
errors, so the size-mismatch error reaches runFetch and is reported via
the fetch session.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Subset returns the index of the first included frame, not one past it.
Update the doc to match the actual (correct) behavior: boundary frames
must appear in both subsets.

Check the error from lz4.Writer.Apply in header serialization, matching
the pattern used in the data compression path.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
)

func NewChunker(
_ context.Context,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused

// getMinReadBatchSize returns the effective min read batch size.
// Queried per-fetch so it can be tuned via feature flags without a restart.
func (c *Chunker) getMinReadBatchSize(ctx context.Context) int64 {
if c.featureFlags != nil {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its never nil

// When onlyIfRunning is true, it is a no-op if the session already
// terminated (used for panic recovery to avoid overriding a successful
// completion).
func (s *fetchSession) setError(err error, onlyIfRunning bool) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets call this unconditionally to always make sure the registerAndWait exits

require.Equal(t, data[:testBlockSize], slice)
}

func TestStreamingChunker_MultiChunkSlice(t *testing.T) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why removing some of these tests?

}

// IsEnabled reports whether compression is configured and active.
func (c *CompressConfig) IsEnabled() bool {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unite with isCompressed

return fmt.Sprintf("%d/%d", r.Start, r.Length)
}

type FrameTable struct {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comment explaining the FrameTable, StartAt (probably rename to offset), and Frames

}

// FrameFor finds the frame containing the given offset and returns its start position and full size.
func (ft *FrameTable) FrameFor(offset int64) (starts FrameOffset, size FrameSize, err error) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't frame, but two parts that compose frames for compressed and uncompressed

}

mappedRange := storage.Range{
Start: int64(mapping.BuildStorageOffset),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets rename Start to Offset (we're storing the offset there)

return ft != nil && ft.compressionType != CompressionNone
}

func (ft *FrameTable) Size() (uncompressed, compressed int64) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might be private or not necessary at all


// File type identifiers for per-file-type compression targeting.
FileTypeMemfile = "memfile"
FileTypeRootfs = "rootfs"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be used

DefaultCompressFrameSize = 2 * 1024 * 1024

// File type identifiers for per-file-type compression targeting.
FileTypeMemfile = "memfile"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

colocate to the compress_config.go

FileTypeRootfs = "rootfs"

// Use case identifiers for per-use-case compression targeting.
UseCaseBuild = "build"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same applies here

// This MUST be multiple of every block/page size:
// - header.HugepageSize (2 MiB) — UFFD huge-page size, also used by prefetch
// - header.RootfsBlockSize (4 KiB) — NBD / rootfs block size
DefaultCompressFrameSize = 2 * 1024 * 1024
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move to the compression config

}
hasher := sha256.New()

ft = &FrameTable{compressionType: cfg.CompressionType()}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets create it at the return time, and make here the frames slice

}

// NewFrameTable creates a FrameTable with the given compression type.
func NewFrameTable(ct CompressionType) *FrameTable {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets remove the constructor

return errors.Join(emitEG.Wait(), uploadEG.Wait())
}

part, compressCtx := newPart(1, ctx, cfg.FrameEncodeWorkers)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets index from 0 and move the +1 index to the GCP implementation

}

part, compressCtx := newPart(1, ctx, cfg.FrameEncodeWorkers)
for {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets have a clear exit point or make it bounded

case errors.Is(err, io.EOF):
case errors.Is(err, io.ErrUnexpectedEOF):
// fall through
default:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

n, err := io.ReadFull(in, buf)
eofReached := errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF)
if err != nil && !eofReached {
pipelineErr := drainPipeline()

		return nil, [32]byte{}, errors.Join(fmt.Errorf("read frame: %w", err), pipelineErr)
	}

	if n > 0 {
		hasher.Write(buf[:n])
		part.addFrame(compressCtx, buf[:n], compressors)
	}

	if eofReached {
		break
	}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feature New feature

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants