Skip to content

Commit 70213ce

Browse files
fix(template-manager): handle error gracefully if metadata already exists (#2130)
* fix(template-manager): handle error gracefully if metadata already exists * fix: handle rate limit * chore: add comment * chore: generalize * chore: generalize * chore: auto-commit generated changes * fix: handle error in close as well * chore: move error handling layer up --------- Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent e65ddff commit 70213ce

3 files changed

Lines changed: 58 additions & 7 deletions

File tree

packages/orchestrator/internal/template/build/layer/layer_executor.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,16 @@ func (lb *LayerExecutor) PauseAndUpload(
312312
},
313313
})
314314
if err != nil {
315-
return fmt.Errorf("error saving UUID to hash mapping: %w", err)
315+
// Since the data should be basically identical, this is safe to skip.
316+
if !errors.Is(err, storage.ErrObjectRateLimited) {
317+
return fmt.Errorf("error saving UUID to hash mapping: %w", err)
318+
}
319+
320+
logger.L().Warn(ctx, "rate limited writing layer metadata to object, skipping",
321+
logger.WithBuildID(meta.Template.BuildID),
322+
zap.String("hash", hash),
323+
zap.Error(err),
324+
)
316325
}
317326

318327
userLogger.Debug(ctx, fmt.Sprintf("Saved: %s", meta.Template.BuildID))

packages/shared/pkg/storage/storage.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ var (
2424

2525
var ErrObjectNotExist = errors.New("object does not exist")
2626

27+
// ErrObjectRateLimited means per-object mutation rate limiting —
28+
// multiple concurrent writers racing to write the same content-addressed object.
29+
var ErrObjectRateLimited = errors.New("object access rate limited")
30+
2731
type Provider string
2832

2933
const (

packages/shared/pkg/storage/storage_google.go

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import (
2020
"google.golang.org/api/option"
2121
"google.golang.org/api/option/internaloption"
2222
"google.golang.org/grpc"
23+
"google.golang.org/grpc/codes"
24+
"google.golang.org/grpc/status"
2325

2426
"github.com/e2b-dev/infra/packages/shared/pkg/consts"
2527
"github.com/e2b-dev/infra/packages/shared/pkg/env"
@@ -303,20 +305,43 @@ func (o *gcpObject) ReadAt(ctx context.Context, buff []byte, off int64) (n int,
303305
return n, err
304306
}
305307

306-
func (o *gcpObject) Put(ctx context.Context, data []byte) (e error) {
308+
func (o *gcpObject) Put(ctx context.Context, data []byte) error {
307309
timer := googleWriteTimerFactory.Begin(attribute.String(gcsOperationAttr, gcsOperationAttrWrite))
308310

309311
w := o.handle.NewWriter(ctx)
310-
defer func() {
311-
if err := w.Close(); err != nil {
312-
e = errors.Join(e, fmt.Errorf("failed to write to %q: %w", o.path, err))
313-
}
314-
}()
315312

316313
c, err := io.Copy(w, bytes.NewReader(data))
317314
if err != nil && !errors.Is(err, io.EOF) {
315+
closeErr := w.Close()
316+
if closeErr != nil {
317+
logger.L().Warn(ctx, "failed to close GCS writer after copy error",
318+
zap.String("object", o.path),
319+
zap.NamedError("error_copy", err),
320+
zap.Error(closeErr),
321+
)
322+
}
323+
324+
timer.Failure(ctx, c)
325+
326+
// ResourceExhausted from GCS means per-object mutation rate limiting —
327+
// multiple concurrent writers racing to write the same content-addressed object.
328+
if isResourceExhausted(err) {
329+
return ErrObjectRateLimited
330+
}
331+
332+
return fmt.Errorf("failed to write to %q: %w", o.path, err)
333+
}
334+
335+
// For small objects the GCS Writer buffers data in memory during Write()
336+
// and performs the actual upload during Close(). ResourceExhausted errors
337+
// from per-object mutation rate limiting will surface here.
338+
if err := w.Close(); err != nil {
318339
timer.Failure(ctx, c)
319340

341+
if isResourceExhausted(err) {
342+
return ErrObjectRateLimited
343+
}
344+
320345
return fmt.Errorf("failed to write to %q: %w", o.path, err)
321346
}
322347

@@ -470,3 +495,16 @@ func parseServiceAccountBase64(serviceAccount string) (*gcpServiceToken, error)
470495

471496
return &sa, nil
472497
}
498+
499+
func isResourceExhausted(err error) bool {
500+
type grpcStatusProvider interface {
501+
GRPCStatus() *status.Status
502+
}
503+
504+
var se grpcStatusProvider
505+
if errors.As(err, &se) {
506+
return se.GRPCStatus().Code() == codes.ResourceExhausted
507+
}
508+
509+
return false
510+
}

0 commit comments

Comments
 (0)