Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,21 @@ func (this *Migrator) retryOperation(operation func() error, notFatalHint ...boo
// sleep after previous iteration
RetrySleepFn(1 * time.Second)
}
// Check for abort/context cancellation before each retry
if abortErr := this.checkAbort(); abortErr != nil {
return abortErr
Comment on lines 161 to +165
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkAbort() is called only after the backoff sleep. If the migration context has already been cancelled/aborted, this loop will still sleep the full interval before returning, which can significantly delay shutdown (especially in exponential backoff scenarios). Consider checking checkAbort() before sleeping, and/or making the sleep itself context-aware (e.g., select on ctx.Done() vs a timer) so cancellation interrupts the wait.

Copilot uses AI. Check for mistakes.
}
err = operation()
if err == nil {
return nil
}
// Check if this is an unrecoverable error (data consistency issues won't resolve on retry)
if strings.Contains(err.Error(), "warnings detected") {
if len(notFatalHint) == 0 {
_ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, err)
}
return err
Comment on lines +172 to +176
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The unrecoverable-warning detection relies on strings.Contains(err.Error(), "warnings detected"), which is brittle (string changes/wrapping can break it and unrelated errors could match). A more robust approach would be to return a dedicated error type or sentinel from the warning-checking code (e.g., in Applier.executeBatchWithWarningChecking) and detect it here via errors.Is/As (or a helper like isWarningsDetected(err)).

Copilot uses AI. Check for mistakes.
}
// there's an error. Let's try again.
}
if len(notFatalHint) == 0 {
Expand All @@ -190,10 +201,21 @@ func (this *Migrator) retryOperationWithExponentialBackoff(operation func() erro
if i != 0 {
RetrySleepFn(time.Duration(interval) * time.Second)
}
// Check for abort/context cancellation before each retry
if abortErr := this.checkAbort(); abortErr != nil {
return abortErr
Comment on lines 202 to +206
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as retryOperation(): the abort check happens after the backoff sleep, so a cancelled context can still incur a long wait (up to ExponentialBackoffMaxInterval) before returning. Consider checking checkAbort() before sleeping and/or using a context-aware wait to allow immediate cancellation.

Copilot uses AI. Check for mistakes.
}
err = operation()
if err == nil {
return nil
}
// Check if this is an unrecoverable error (data consistency issues won't resolve on retry)
if strings.Contains(err.Error(), "warnings detected") {
if len(notFatalHint) == 0 {
_ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, err)
}
return err
Comment on lines +213 to +217
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same brittle warning detection as the non-exponential retry: matching on the error string is fragile. Prefer a typed/sentinel warning error (created where the warning is detected) and check via errors.Is/As (or a shared helper) to keep this behavior stable across message changes.

Copilot uses AI. Check for mistakes.
}
}
if len(notFatalHint) == 0 {
// Use helper to prevent deadlock if listenOnPanicAbort already exited
Expand Down
211 changes: 211 additions & 0 deletions go/logic/migrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,118 @@ func TestMigratorRetryWithExponentialBackoff(t *testing.T) {
assert.Equal(t, tries, 100)
}

func TestMigratorRetryAbortsOnContextCancellation(t *testing.T) {
oldRetrySleepFn := RetrySleepFn
defer func() { RetrySleepFn = oldRetrySleepFn }()

migrationContext := base.NewMigrationContext()
migrationContext.SetDefaultNumRetries(100)
migrator := NewMigrator(migrationContext, "1.2.3")

RetrySleepFn = func(duration time.Duration) {
// No sleep needed for this test
}

var tries = 0
retryable := func() error {
tries++
if tries == 5 {
// Cancel context on 5th try
migrationContext.CancelContext()
}
return errors.New("Simulated error")
}

result := migrator.retryOperation(retryable, false)
assert.Error(t, result)
// Should abort after 6 tries: 5 failures + 1 checkAbort detection
assert.True(t, tries <= 6, "Expected tries <= 6, got %d", tries)
// Verify we got context cancellation error
assert.Contains(t, result.Error(), "context canceled")
}

func TestMigratorRetryWithExponentialBackoffAbortsOnContextCancellation(t *testing.T) {
oldRetrySleepFn := RetrySleepFn
defer func() { RetrySleepFn = oldRetrySleepFn }()

migrationContext := base.NewMigrationContext()
migrationContext.SetDefaultNumRetries(100)
migrationContext.SetExponentialBackoffMaxInterval(42)
migrator := NewMigrator(migrationContext, "1.2.3")

RetrySleepFn = func(duration time.Duration) {
// No sleep needed for this test
}

var tries = 0
retryable := func() error {
tries++
if tries == 5 {
// Cancel context on 5th try
migrationContext.CancelContext()
}
return errors.New("Simulated error")
}

result := migrator.retryOperationWithExponentialBackoff(retryable, false)
assert.Error(t, result)
// Should abort after 6 tries: 5 failures + 1 checkAbort detection
assert.True(t, tries <= 6, "Expected tries <= 6, got %d", tries)
// Verify we got context cancellation error
assert.Contains(t, result.Error(), "context canceled")
}

func TestMigratorRetrySkipsRetriesForWarnings(t *testing.T) {
oldRetrySleepFn := RetrySleepFn
defer func() { RetrySleepFn = oldRetrySleepFn }()

migrationContext := base.NewMigrationContext()
migrationContext.SetDefaultNumRetries(100)
migrator := NewMigrator(migrationContext, "1.2.3")

RetrySleepFn = func(duration time.Duration) {
t.Fatal("Should not sleep/retry for warning errors")
}

var tries = 0
retryable := func() error {
tries++
return errors.New("warnings detected in statement 1 of 1: [Warning: Duplicate entry 'test' for key 'idx' (1062)]")
}

result := migrator.retryOperation(retryable, false)
assert.Error(t, result)
// Should only try once - no retries for warnings
assert.Equal(t, 1, tries, "Expected exactly 1 try (no retries) for warning error")
assert.Contains(t, result.Error(), "warnings detected")
}

func TestMigratorRetryWithExponentialBackoffSkipsRetriesForWarnings(t *testing.T) {
oldRetrySleepFn := RetrySleepFn
defer func() { RetrySleepFn = oldRetrySleepFn }()

migrationContext := base.NewMigrationContext()
migrationContext.SetDefaultNumRetries(100)
migrationContext.SetExponentialBackoffMaxInterval(42)
migrator := NewMigrator(migrationContext, "1.2.3")

RetrySleepFn = func(duration time.Duration) {
t.Fatal("Should not sleep/retry for warning errors")
}

var tries = 0
retryable := func() error {
tries++
return errors.New("warnings detected in statement 1 of 1: [Warning: Duplicate entry 'test' for key 'idx' (1062)]")
}

result := migrator.retryOperationWithExponentialBackoff(retryable, false)
assert.Error(t, result)
// Should only try once - no retries for warnings
assert.Equal(t, 1, tries, "Expected exactly 1 try (no retries) for warning error")
assert.Contains(t, result.Error(), "warnings detected")
}

func (suite *MigratorTestSuite) TestCutOverLossDataCaseLockGhostBeforeRename() {
ctx := context.Background()

Expand Down Expand Up @@ -1210,3 +1322,102 @@ func TestCheckAbort_DetectsContextCancellation(t *testing.T) {
t.Fatal("Expected checkAbort to return error when context is cancelled")
}
}

func (suite *MigratorTestSuite) TestPanicOnWarningsDuplicateDuringCutoverWithHighRetries() {
ctx := context.Background()

// Create table with email column (no unique constraint initially)
_, err := suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY AUTO_INCREMENT, email VARCHAR(100))", getTestTableName()))
suite.Require().NoError(err)

// Insert initial rows with unique email values - passes pre-flight validation
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (email) VALUES ('user1@example.com')", getTestTableName()))
suite.Require().NoError(err)
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (email) VALUES ('user2@example.com')", getTestTableName()))
suite.Require().NoError(err)
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (email) VALUES ('user3@example.com')", getTestTableName()))
suite.Require().NoError(err)

// Verify we have 3 rows
var count int
err = suite.db.QueryRowContext(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s", getTestTableName())).Scan(&count)
suite.Require().NoError(err)
suite.Require().Equal(3, count)

// Create postpone flag file
tmpDir, err := os.MkdirTemp("", "gh-ost-postpone-test")
suite.Require().NoError(err)
defer os.RemoveAll(tmpDir)
postponeFlagFile := filepath.Join(tmpDir, "postpone.flag")
err = os.WriteFile(postponeFlagFile, []byte{}, 0644)
suite.Require().NoError(err)

// Start migration in goroutine
done := make(chan error, 1)
go func() {
connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer)
if err != nil {
done <- err
return
}

migrationContext := newTestMigrationContext()
migrationContext.ApplierConnectionConfig = connectionConfig
migrationContext.InspectorConnectionConfig = connectionConfig
migrationContext.SetConnectionConfig("innodb")
migrationContext.AlterStatementOptions = "ADD UNIQUE KEY unique_email_idx (email)"
migrationContext.HeartbeatIntervalMilliseconds = 100
migrationContext.PostponeCutOverFlagFile = postponeFlagFile
migrationContext.PanicOnWarnings = true

// High retry count + exponential backoff means retries will take a long time and fail the test if not properly aborted
migrationContext.SetDefaultNumRetries(30)
migrationContext.CutOverExponentialBackoff = true
migrationContext.SetExponentialBackoffMaxInterval(128)

migrator := NewMigrator(migrationContext, "0.0.0")

//nolint:contextcheck
done <- migrator.Migrate()
}()

// Wait for migration to reach postponed state
// TODO replace this with an actual check for postponed state
time.Sleep(3 * time.Second)

// Now insert a duplicate email value while migration is postponed
// This simulates data arriving during migration that would violate the unique constraint
_, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (email) VALUES ('user1@example.com')", getTestTableName()))
suite.Require().NoError(err)

// Verify we now have 4 rows (including the duplicate)
err = suite.db.QueryRowContext(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s", getTestTableName())).Scan(&count)
suite.Require().NoError(err)
suite.Require().Equal(4, count)

// Unpostpone the migration - gh-ost will now try to apply binlog events with the duplicate
err = os.Remove(postponeFlagFile)
suite.Require().NoError(err)

// Wait for Migrate() to return - with timeout to detect if it hangs
select {
case migrateErr := <-done:
// Success - Migrate() returned
// It should return an error due to the duplicate
suite.Require().Error(migrateErr, "Expected migration to fail due to duplicate key violation")
suite.Require().Contains(migrateErr.Error(), "Duplicate entry", "Error should mention duplicate entry")
case <-time.After(5 * time.Minute):
suite.FailNow("Migrate() hung and did not return within 5 minutes - failure to abort on warnings in retry loop")
}

// Verify all 4 rows are still in the original table (no silent data loss)
err = suite.db.QueryRowContext(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s", getTestTableName())).Scan(&count)
suite.Require().NoError(err)
suite.Require().Equal(4, count, "Original table should still have all 4 rows")

// Verify both user1@example.com entries still exist
var duplicateCount int
err = suite.db.QueryRowContext(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s WHERE email = 'user1@example.com'", getTestTableName())).Scan(&duplicateCount)
suite.Require().NoError(err)
suite.Require().Equal(2, duplicateCount, "Should have 2 duplicate email entries")
}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
ERROR warnings detected in statement 1 of 1
ERROR warnings detected in statement
Loading