Conversation
…ning - Use header.HugepageSize for uncompressed fetch alignment (semantically correct) - Stream NFS cache hits directly into ReadFrame instead of buffering in memory - Fix timer placement to cover full GetFrame (read + decompression) - Fix onRead callback: nil for compressed inner calls (prevents double-invoke), pass through for uncompressed (bytes are final) - Remove panic recovery from runFetch (never in main) - Remove low-value chunker tests subsumed by ConcurrentStress - Remove 4MB frame configs from benchmarks (targeting 2MB only) - Remove unused readCacheFile function Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ble NFS cache
- Remove dead flagsClient chain through chunker/build/template layers (~15 files)
- Delete ChunkerConfigFlag (unused after flagsClient removal)
- Delete mock_flagsclient_test.go
- Simplify GetUploadOptions: remove redundant intOr/strOr fallbacks (flags have defaults)
- Add GetCompressionType helper to frame_table.go, deduplicate compression type extraction
- Replace [16]byte{} with uuid.Nil and "rootfs.ext4" with storage.RootfsName in inspect-build
- Simplify UploadV4Header return pattern
- Remove onRead callback from legacy fullFetchChunker (FullFetch should not use progressive reads)
- Re-enable NFS cache in template cache.go
- Remove all fmt.Printf debug instrumentation from orchestrator
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…sionType threading
Add per-build file size and SHA-256 checksum to V4 headers, eliminating
the redundant Size() network call when opening upstream data files on
the read path. Checksums are computed for free by piggybacking on
CompressStream's existing frame iteration.
Remove the separate compressionType parameter threaded through
getBuild → newStorageDiff → NewChunker; the read path now derives
compression state from the per-mapping FrameTable directly.
V4 binary format change (not yet deployed):
[Metadata] [LZ4: numBuilds, builds(uuid+size+checksum),
numMappings, mappings...]
V3 path unchanged — falls back to Size() call when size is unknown.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Merge writeFrameToCache and writeChunkToCache into unified writeToCache with lock + atomic rename, used by all three cache write paths - Fix file descriptor leak in cache hit paths: defer f.Close() and wrap in NopCloser so ReadFrame's close doesn't double-close the fd - Add defer uploader.Close() in CompressStream so PartUploader file handles are released on error paths between Start() and Complete() - Make Close() idempotent via sync.Once on fsPartUploader and filePartWriter Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
… lev-compression-final
The SHA-256 checksum in BuildFileInfo now covers uncompressed data, making it useful for end-to-end integrity verification of the original content. Updated inspect-build to use SHA-256 (replacing MD5) and verify checksums against the header. Fixed early-return lint warnings. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
GetUploadOptions now accepts fileType and useCase parameters, enriching the LD evaluation context so dashboard targeting rules can differentiate (e.g. compress memfile but not rootfs, or builds but not pauses). TemplateBuild accepts per-file opts directly instead of holding an ff reference. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…leanly Close both the decompressor and raw stream before deciding whether to write back to NFS cache. Previously a partial-read scenario could cache incomplete compressed data if the decompressor failed to drain. Also eliminates an unnecessary buffer copy by transferring ownership of the backing array to the background writeback goroutine. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Replace SetFrames (always scans from frame 0) with SetFramesFrom in all mapping split paths. In the "diff inside base" case, the left split returns a cursor that the right split continues from, avoiding O(N²) rescanning of the base FrameTable. Add table-driven tests covering FrameTable propagation through all split paths, including multi-layer headers with three compressed builds. Each test case also verifies the read-path invariant: FrameFor must resolve both the first and last byte of every mapping's storage range. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
… lev-compression-final
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Remove unused methods, functions, and constants identified during code review. Rename SetFramesFrom→SetFrames and SubsetFrom→Subset now that their shorter-named wrappers are gone. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…nfig logging The integration_tests.yml workflow referenced inputs.compression which was never defined, so artifact name prefixes never activated. Removed since there's a single test run (always LZ4). Added compression config logging to build and pause upload paths so the active config is visible in logs. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- CompressionType OTEL attribute used string(byte) producing "\x01" instead of "zstd"; use .String() method - FS backend leaked raw reader when newDecompressingReadCloser failed; add raw.Close() on error (GCS already handled this correctly) - Write timer leaked on compressed upload path; record Success/Failure after storeFileCompressed returns Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…v-compression-final
…d cache metrics - Restore cache.go to match main (identical to e2b/main) - Use sliceDirect after fetch to avoid TOCTOU with coarse dirty granularity - Defer setIsCached to runFetch after full chunk is written - Restore CI workflow integration_tests.yml formatting (keep compression settings) - Minimize storage_cache_metrics.go diff vs base Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…hecks compressStream: on read-loop error or context cancellation, wait for emit and upload goroutines to finish before returning. Previously uploader.Close() could race with in-flight UploadPart calls. Remove defensive nil-FrameTable checks in compressed uploader — compressStream guarantees non-nil FrameTable on success. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…retry test fetchSession: replace the close-and-recreate channel broadcast pattern with sync.Cond. Eliminates O(advance_calls) channel allocations per fetch — advance() is now zero-alloc. Widen timing bounds in TestRetryableClient_ActualRetryBehavior to prevent flakes on slow CI runners (pre-existing, not our test). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Replace unbounded retry loop in File.ReadAt/Slice with maxTransitionRetries=2 counter. Remove redundant swapFailed atomic — the retry counter bounds all failure modes (corrupt headers fail immediately, successful-but-unhelpful swaps are capped). Replace `type PendingBuildInfo sync.Map` (unidiomatic type alias requiring (*sync.Map)(p) casts) with a struct wrapping sync.Mutex + plain map. The entry count is tiny (2-8 keys) and doesn't benefit from sync.Map's sharding. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Replace manual strconv buffer composition in makeFrameFilename with fmt.Sprintf — simpler and equivalent. - Restore retry test to main's original 200ms/300ms bounds; the widened 2s timeout was unnecessary since the PR doesn't change retry logic. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Send a single advance signal instead of closing the channel, so the reader can only complete one 16KB step. This guarantees the late offset is unreachable when we check — no dependency on goroutine scheduling. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…lose errors decompressingCacheReader.Close() now returns an error when the captured compressed bytes don't match the expected frame size, instead of silently skipping the NFS cache writeback. This surfaces frame table / header bugs that would otherwise manifest as persistent GCS round-trips. progressiveRead uses a named return + defer to propagate reader.Close() errors, so the size-mismatch error reaches runFetch and is reported via the fetch session. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Subset returns the index of the first included frame, not one past it. Update the doc to match the actual (correct) behavior: boundary frames must appear in both subsets. Check the error from lz4.Writer.Apply in header serialization, matching the pattern used in the data compression path. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
| ) | ||
|
|
||
| func NewChunker( | ||
| _ context.Context, |
| // getMinReadBatchSize returns the effective min read batch size. | ||
| // Queried per-fetch so it can be tuned via feature flags without a restart. | ||
| func (c *Chunker) getMinReadBatchSize(ctx context.Context) int64 { | ||
| if c.featureFlags != nil { |
| // When onlyIfRunning is true, it is a no-op if the session already | ||
| // terminated (used for panic recovery to avoid overriding a successful | ||
| // completion). | ||
| func (s *fetchSession) setError(err error, onlyIfRunning bool) { |
There was a problem hiding this comment.
lets call this unconditionally to always make sure the registerAndWait exits
| require.Equal(t, data[:testBlockSize], slice) | ||
| } | ||
|
|
||
| func TestStreamingChunker_MultiChunkSlice(t *testing.T) { |
There was a problem hiding this comment.
why removing some of these tests?
| } | ||
|
|
||
| // IsEnabled reports whether compression is configured and active. | ||
| func (c *CompressConfig) IsEnabled() bool { |
| return fmt.Sprintf("%d/%d", r.Start, r.Length) | ||
| } | ||
|
|
||
| type FrameTable struct { |
There was a problem hiding this comment.
add comment explaining the FrameTable, StartAt (probably rename to offset), and Frames
| } | ||
|
|
||
| // FrameFor finds the frame containing the given offset and returns its start position and full size. | ||
| func (ft *FrameTable) FrameFor(offset int64) (starts FrameOffset, size FrameSize, err error) { |
There was a problem hiding this comment.
this doesn't frame, but two parts that compose frames for compressed and uncompressed
| } | ||
|
|
||
| mappedRange := storage.Range{ | ||
| Start: int64(mapping.BuildStorageOffset), |
There was a problem hiding this comment.
lets rename Start to Offset (we're storing the offset there)
| return ft != nil && ft.compressionType != CompressionNone | ||
| } | ||
|
|
||
| func (ft *FrameTable) Size() (uncompressed, compressed int64) { |
There was a problem hiding this comment.
this might be private or not necessary at all
|
|
||
| // File type identifiers for per-file-type compression targeting. | ||
| FileTypeMemfile = "memfile" | ||
| FileTypeRootfs = "rootfs" |
| DefaultCompressFrameSize = 2 * 1024 * 1024 | ||
|
|
||
| // File type identifiers for per-file-type compression targeting. | ||
| FileTypeMemfile = "memfile" |
There was a problem hiding this comment.
colocate to the compress_config.go
| FileTypeRootfs = "rootfs" | ||
|
|
||
| // Use case identifiers for per-use-case compression targeting. | ||
| UseCaseBuild = "build" |
| // This MUST be multiple of every block/page size: | ||
| // - header.HugepageSize (2 MiB) — UFFD huge-page size, also used by prefetch | ||
| // - header.RootfsBlockSize (4 KiB) — NBD / rootfs block size | ||
| DefaultCompressFrameSize = 2 * 1024 * 1024 |
There was a problem hiding this comment.
move to the compression config
| } | ||
| hasher := sha256.New() | ||
|
|
||
| ft = &FrameTable{compressionType: cfg.CompressionType()} |
There was a problem hiding this comment.
lets create it at the return time, and make here the frames slice
| } | ||
|
|
||
| // NewFrameTable creates a FrameTable with the given compression type. | ||
| func NewFrameTable(ct CompressionType) *FrameTable { |
| return errors.Join(emitEG.Wait(), uploadEG.Wait()) | ||
| } | ||
|
|
||
| part, compressCtx := newPart(1, ctx, cfg.FrameEncodeWorkers) |
There was a problem hiding this comment.
lets index from 0 and move the +1 index to the GCP implementation
| } | ||
|
|
||
| part, compressCtx := newPart(1, ctx, cfg.FrameEncodeWorkers) | ||
| for { |
There was a problem hiding this comment.
lets have a clear exit point or make it bounded
| case errors.Is(err, io.EOF): | ||
| case errors.Is(err, io.ErrUnexpectedEOF): | ||
| // fall through | ||
| default: |
There was a problem hiding this comment.
n, err := io.ReadFull(in, buf)
eofReached := errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF)
if err != nil && !eofReached {
pipelineErr := drainPipeline()
return nil, [32]byte{}, errors.Join(fmt.Errorf("read frame: %w", err), pipelineErr)
}
if n > 0 {
hasher.Write(buf[:n])
part.addFrame(compressCtx, buf[:n], compressors)
}
if eofReached {
break
}
Summary
Compression for data files (memfile, rootfs). Files are broken into independently decompressible frames (2 MiB, zstd), stored in GCS alongside V4 headers with per-mapping frame tables. Fully backward-compatible: the read path auto-detects V3/V4 headers and routes compressed vs uncompressed reads per-mapping. Gated by compress-config LaunchDarkly flag (per-team/cluster/template targeting).
What changed
FramedFileinterface replacesSeekable— unifiedGetFrame(ctx, offset, frameTable, decompress, buf, readSize, onRead)handles both compressed and uncompressed dataFrameTableper mapping +BuildFileInfo(uncompressed size, SHA-256 checksum) per build; LZ4-block-compressed header blobChunkerwith mmap cache, and fetch sessions dedupe replacing streaming_chunk.goRead path
P2P header switchover
Benchmark results
End-to-end pause/resume
(BenchmarkBaseImage, 50 iterations, local disk):
Full architecture doc: docs/compression-architecture.md