Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions packages/db/pkg/testutils/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"os/exec"
"path/filepath"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -106,6 +107,12 @@ func SetupDatabase(t *testing.T) *Database {
}
}

// gooseMu serializes goose operations across parallel tests.
// goose.OpenDBWithDriver calls goose.SetDialect which writes to package-level
// globals (dialect, store) without synchronization. Concurrent test goroutines
// race on these globals, triggering the race detector on ARM64.
var gooseMu sync.Mutex

// runDatabaseMigrations executes all required database migrations
func runDatabaseMigrations(t *testing.T, connStr string) {
t.Helper()
Expand All @@ -115,6 +122,9 @@ func runDatabaseMigrations(t *testing.T, connStr string) {
require.NoError(t, err, "Failed to find git root")
repoRoot := strings.TrimSpace(string(output))

gooseMu.Lock()
defer gooseMu.Unlock()

db, err := goose.OpenDBWithDriver("pgx", connStr)
require.NoError(t, err)
t.Cleanup(func() {
Expand Down
18 changes: 13 additions & 5 deletions packages/envd/internal/services/legacy/conversion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package legacy

import (
"bytes"
"context"
"io"
"net/http"
"net/http/httptest"
Expand All @@ -22,12 +23,19 @@ import (
func TestFilesystemClient_FieldFormatter(t *testing.T) {
t.Parallel()
fsh := filesystemconnectmocks.NewMockFilesystemHandler(t)
fsh.EXPECT().Move(mock.Anything, mock.Anything).Return(connect.NewResponse(&filesystem.MoveResponse{
Entry: &filesystem.EntryInfo{
Name: "test-name",
Owner: "new-extra-field",
// Use RunAndReturn to create a fresh response per call. Using Return()
// shares one Response across parallel subtests, causing a data race on
// the lazily-initialized header/trailer maps inside connect.Response.
fsh.EXPECT().Move(mock.Anything, mock.Anything).RunAndReturn(
func(_ context.Context, _ *connect.Request[filesystem.MoveRequest]) (*connect.Response[filesystem.MoveResponse], error) {
return connect.NewResponse(&filesystem.MoveResponse{
Entry: &filesystem.EntryInfo{
Name: "test-name",
Owner: "new-extra-field",
},
}), nil
},
}), nil)
)

_, handler := filesystemconnect.NewFilesystemHandler(fsh,
connect.WithInterceptors(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ type Candidate struct {
}

type statReq struct {
df *os.File
dirPath string
name string
response chan *statReq
f *File
Expand Down
9 changes: 6 additions & 3 deletions packages/orchestrator/cmd/clean-nfs-cache/cleaner/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (c *Cleaner) Statter(ctx context.Context, done *sync.WaitGroup) {
case <-ctx.Done():
return
case req := <-c.statRequestCh:
f, err := c.statInDir(req.df, req.name)
f, err := c.statInDir(req.dirPath, req.name)
req.f = f
req.err = err
req.response <- req
Expand Down Expand Up @@ -201,13 +201,16 @@ func (c *Cleaner) scanDir(ctx context.Context, path []*Dir) (out *Dir, err error
}
}

// submit all stat requests
// Submit stat requests using the directory path (not the *os.File).
// The file descriptor df is closed when scanDir returns (defer above),
// but Statter goroutines may still be processing requests concurrently.
// Passing the path avoids a race between df.Close() and df.Fd().
responseCh := make(chan *statReq, len(filenames))
for _, name := range filenames {
select {
case <-ctx.Done():
return nil, ctx.Err()
case c.statRequestCh <- &statReq{df: df, name: name, response: responseCh}:
case c.statRequestCh <- &statReq{dirPath: absPath, name: name, response: responseCh}:
// submitted
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ package cleaner

import (
"fmt"
"os"
"path/filepath"

"golang.org/x/sys/unix"
)
Expand All @@ -30,17 +30,18 @@ func (c *Cleaner) stat(fullPath string) (*Candidate, error) {
}, nil
}

func (c *Cleaner) statInDir(df *os.File, filename string) (*File, error) {
func (c *Cleaner) statInDir(dirPath string, filename string) (*File, error) {
c.StatxC.Add(1)
c.StatxInDirC.Add(1)
var statx unix.Statx_t
err := unix.Statx(int(df.Fd()), filename,
fullPath := filepath.Join(dirPath, filename)
err := unix.Statx(unix.AT_FDCWD, fullPath,
unix.AT_STATX_DONT_SYNC|unix.AT_SYMLINK_NOFOLLOW|unix.AT_NO_AUTOMOUNT,
unix.STATX_ATIME|unix.STATX_SIZE,
&statx,
Comment on lines +33 to 41
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 The statInDir error message in stat_linux.go still formats only the bare filename instead of the full path filepath.Join(dirPath, filename) that is already computed on line 37 for the syscall. On NFS errors like ESTALE or ENOENT, operators see only "failed to statx \"abc123.squashfs\": stale file handle" with no directory context, making root-cause analysis harder. Fix: replace filename with filepath.Join(dirPath, filename) in the fmt.Errorf call.

Extended reasoning...

What the bug is

After this PR's refactor, statInDir in stat_linux.go takes dirPath string instead of df *os.File. Line 37 explicitly computes the full path as filepath.Join(dirPath, filename) to pass to unix.Statx. However, the error message on line 43 was not updated and still uses only the short filename:

return nil, fmt.Errorf("failed to statx %q: %w", filename, err)

The specific inconsistency

The sibling stat() function correctly uses fullPath in its error message:

return nil, fmt.Errorf("failed to statx %q: %w", fullPath, err)

The full path filepath.Join(dirPath, filename) is computed two lines above the error check but not reused in the diagnostic.

Why existing code doesn't prevent it

Before this PR, statInDir took df *os.File and called unix.Statx(int(df.Fd()), filename, ...) — the full path was not a named local variable (though available via filepath.Join(df.Name(), filename)). The old error message logging only filename was a pre-existing omission. This PR changes the signature specifically to pass the directory path as a string, and computes filepath.Join(dirPath, filename) explicitly, but leaves the error format unchanged.

Impact

When NFS-related errors occur (ESTALE, ENOENT, EIO), the log line reads: failed to statx "abc123.squashfs": stale file handle. Without the directory context, an operator cannot determine which NFS mount or subdirectory is affected — particularly problematic in large NFS caches with many subdirectories containing files of the same name.

How to fix

On line 43 of stat_linux.go, replace filename with filepath.Join(dirPath, filename):

return nil, fmt.Errorf("failed to statx %q: %w", filepath.Join(dirPath, filename), err)

Step-by-step proof

  1. scanDir computes absPath (e.g., /nfs/cache/subdir) and sends statReq{dirPath: absPath, name: "abc123.squashfs"}.
  2. Statter calls statInDir("/nfs/cache/subdir", "abc123.squashfs").
  3. Line 37 computes filepath.Join("/nfs/cache/subdir", "abc123.squashfs") = "/nfs/cache/subdir/abc123.squashfs" and passes it to unix.Statx.
  4. If the file is stale, err = ESTALE.
  5. Line 43 formats: "failed to statx \"abc123.squashfs\": stale file handle" — directory context /nfs/cache/subdir is absent.
  6. With the fix, the error reads: "failed to statx \"/nfs/cache/subdir/abc123.squashfs\": stale file handle" — fully actionable.

Addressing the pre-existing objection

One verifier correctly notes this is pre-existing behavior — the error message used only filename before and after this PR, so no regression is introduced. This is classified as nit rather than normal severity. However, this PR is the ideal time to fix it: it changes the function signature specifically to make dirPath available, computes the full path on line 37, and the one-character fix makes the diagnostic consistent with the sibling stat() function.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — statInDir now stores the full path in a variable and uses it in both the Statx call and the error message.

)
if err != nil {
return nil, fmt.Errorf("failed to statx %q: %w", filename, err)
return nil, fmt.Errorf("failed to statx %q: %w", fullPath, err)
}

return &File{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ func (c *Cleaner) stat(path string) (*Candidate, error) {
}, nil
}

func (c *Cleaner) statInDir(df *os.File, filename string) (*File, error) {
func (c *Cleaner) statInDir(dirPath string, filename string) (*File, error) {
c.StatxInDirC.Add(1)
// performance on OS X doeas not matter, so just use the full stat
cand, err := c.stat(filepath.Join(df.Name(), filename))
// performance on OS X does not matter, so just use the full stat
cand, err := c.stat(filepath.Join(dirPath, filename))
if err != nil {
return nil, err
}
Expand Down
14 changes: 11 additions & 3 deletions packages/shared/pkg/storage/gcp_multipart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func TestMultipartUploader_UploadFileInParallel_Success(t *testing.T) {

var uploadID string
var initiateCount, uploadPartCount, completeCount int32
receivedParts := make(map[int]string)
receivedParts := sync.Map{}

handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch {
Expand All @@ -194,7 +194,7 @@ func TestMultipartUploader_UploadFileInParallel_Success(t *testing.T) {
// Upload part
partNum := atomic.AddInt32(&uploadPartCount, 1)
body, _ := io.ReadAll(r.Body)
receivedParts[int(partNum)] = string(body)
receivedParts.Store(int(partNum), string(body))

w.Header().Set("ETag", fmt.Sprintf(`"etag%d"`, partNum))
w.WriteHeader(http.StatusOK)
Expand All @@ -217,7 +217,9 @@ func TestMultipartUploader_UploadFileInParallel_Success(t *testing.T) {
// Verify all parts were uploaded and content matches
var reconstructed strings.Builder
for i := 1; i <= int(atomic.LoadInt32(&uploadPartCount)); i++ {
reconstructed.WriteString(receivedParts[i])
if part, ok := receivedParts.Load(i); ok {
reconstructed.WriteString(part.(string))
}
}
require.Equal(t, testContent, reconstructed.String())
}
Expand Down Expand Up @@ -655,6 +657,7 @@ func TestMultipartUploader_BoundaryConditions_ExactChunkSize(t *testing.T) {
require.NoError(t, err)

var partSizes []int
var partSizesMu sync.Mutex

handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch {
Expand All @@ -670,7 +673,9 @@ func TestMultipartUploader_BoundaryConditions_ExactChunkSize(t *testing.T) {

case strings.Contains(r.URL.RawQuery, "partNumber"):
body, _ := io.ReadAll(r.Body)
partSizesMu.Lock()
partSizes = append(partSizes, len(body))
partSizesMu.Unlock()

partNum := strings.Split(strings.Split(r.URL.RawQuery, "partNumber=")[1], "&")[0]
w.Header().Set("ETag", fmt.Sprintf(`"boundary-etag-%s"`, partNum))
Expand Down Expand Up @@ -904,10 +909,13 @@ func TestRetryableClient_ActualRetryBehavior(t *testing.T) {
var requestCount int32
var retryDelays []time.Duration
var retryTimes []time.Time
var retryMu sync.Mutex

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
count := atomic.AddInt32(&requestCount, 1)
retryMu.Lock()
retryTimes = append(retryTimes, time.Now())
retryMu.Unlock()

if count < 3 {
w.WriteHeader(http.StatusInternalServerError)
Expand Down
29 changes: 19 additions & 10 deletions packages/shared/pkg/utils/errorcollector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package utils
import (
"context"
"errors"
"sync/atomic"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -49,23 +50,31 @@ func TestErrorCollector(t *testing.T) {

ec := NewErrorCollector(1)

// Block the collector's only slot
// Block the collector's only slot.
// ctx1 and ctx2 must be distinct variables: the closure passed to ec.Go
// captures the context variable by reference. If we reused a single "ctx"
// variable, the first closure's <-ctx.Done() would race with the main
// goroutine's reassignment of ctx on the second WithCancel call.
started := make(chan struct{})
ctx, cancel1 := context.WithCancel(t.Context())
ec.Go(ctx, func() error {
ctx1, cancel1 := context.WithCancel(t.Context())
ec.Go(ctx1, func() error {
close(started)
<-ctx.Done()
<-ctx1.Done()

return nil
})

<-started

// This Go call should block on the semaphore
var wasCalled bool
ctx, cancel2 := context.WithCancel(t.Context())
ec.Go(ctx, func() error {
wasCalled = true
// This Go call should block on the semaphore.
// wasCalled must be atomic: the goroutine spawned by ec.Go may write it
// concurrently with the main goroutine's read in assert.False below.
// A plain bool causes a data race that the -race detector catches on ARM64
// (weaker memory model) even though it appears safe on x86.
var wasCalled atomic.Bool
ctx2, cancel2 := context.WithCancel(t.Context())
ec.Go(ctx2, func() error {
wasCalled.Store(true)

return nil
})
Expand All @@ -78,6 +87,6 @@ func TestErrorCollector(t *testing.T) {

err := ec.Wait()
require.ErrorIs(t, err, context.Canceled)
assert.False(t, wasCalled)
assert.False(t, wasCalled.Load())
})
}
Loading