diff --git a/go/logic/migrator.go b/go/logic/migrator.go index e3e6d429d..f32d859b8 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -160,10 +160,21 @@ func (this *Migrator) retryOperation(operation func() error, notFatalHint ...boo // sleep after previous iteration RetrySleepFn(1 * time.Second) } + // Check for abort/context cancellation before each retry + if abortErr := this.checkAbort(); abortErr != nil { + return abortErr + } err = operation() if err == nil { return nil } + // Check if this is an unrecoverable error (data consistency issues won't resolve on retry) + if strings.Contains(err.Error(), "warnings detected") { + if len(notFatalHint) == 0 { + _ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, err) + } + return err + } // there's an error. Let's try again. } if len(notFatalHint) == 0 { @@ -190,10 +201,21 @@ func (this *Migrator) retryOperationWithExponentialBackoff(operation func() erro if i != 0 { RetrySleepFn(time.Duration(interval) * time.Second) } + // Check for abort/context cancellation before each retry + if abortErr := this.checkAbort(); abortErr != nil { + return abortErr + } err = operation() if err == nil { return nil } + // Check if this is an unrecoverable error (data consistency issues won't resolve on retry) + if strings.Contains(err.Error(), "warnings detected") { + if len(notFatalHint) == 0 { + _ = base.SendWithContext(this.migrationContext.GetContext(), this.migrationContext.PanicAbort, err) + } + return err + } } if len(notFatalHint) == 0 { // Use helper to prevent deadlock if listenOnPanicAbort already exited diff --git a/go/logic/migrator_test.go b/go/logic/migrator_test.go index 7b02c6b3f..f731035e1 100644 --- a/go/logic/migrator_test.go +++ b/go/logic/migrator_test.go @@ -714,6 +714,118 @@ func TestMigratorRetryWithExponentialBackoff(t *testing.T) { assert.Equal(t, tries, 100) } +func TestMigratorRetryAbortsOnContextCancellation(t *testing.T) { + oldRetrySleepFn := RetrySleepFn + defer func() { RetrySleepFn = oldRetrySleepFn }() + + migrationContext := base.NewMigrationContext() + migrationContext.SetDefaultNumRetries(100) + migrator := NewMigrator(migrationContext, "1.2.3") + + RetrySleepFn = func(duration time.Duration) { + // No sleep needed for this test + } + + var tries = 0 + retryable := func() error { + tries++ + if tries == 5 { + // Cancel context on 5th try + migrationContext.CancelContext() + } + return errors.New("Simulated error") + } + + result := migrator.retryOperation(retryable, false) + assert.Error(t, result) + // Should abort after 6 tries: 5 failures + 1 checkAbort detection + assert.True(t, tries <= 6, "Expected tries <= 6, got %d", tries) + // Verify we got context cancellation error + assert.Contains(t, result.Error(), "context canceled") +} + +func TestMigratorRetryWithExponentialBackoffAbortsOnContextCancellation(t *testing.T) { + oldRetrySleepFn := RetrySleepFn + defer func() { RetrySleepFn = oldRetrySleepFn }() + + migrationContext := base.NewMigrationContext() + migrationContext.SetDefaultNumRetries(100) + migrationContext.SetExponentialBackoffMaxInterval(42) + migrator := NewMigrator(migrationContext, "1.2.3") + + RetrySleepFn = func(duration time.Duration) { + // No sleep needed for this test + } + + var tries = 0 + retryable := func() error { + tries++ + if tries == 5 { + // Cancel context on 5th try + migrationContext.CancelContext() + } + return errors.New("Simulated error") + } + + result := migrator.retryOperationWithExponentialBackoff(retryable, false) + assert.Error(t, result) + // Should abort after 6 tries: 5 failures + 1 checkAbort detection + assert.True(t, tries <= 6, "Expected tries <= 6, got %d", tries) + // Verify we got context cancellation error + assert.Contains(t, result.Error(), "context canceled") +} + +func TestMigratorRetrySkipsRetriesForWarnings(t *testing.T) { + oldRetrySleepFn := RetrySleepFn + defer func() { RetrySleepFn = oldRetrySleepFn }() + + migrationContext := base.NewMigrationContext() + migrationContext.SetDefaultNumRetries(100) + migrator := NewMigrator(migrationContext, "1.2.3") + + RetrySleepFn = func(duration time.Duration) { + t.Fatal("Should not sleep/retry for warning errors") + } + + var tries = 0 + retryable := func() error { + tries++ + return errors.New("warnings detected in statement 1 of 1: [Warning: Duplicate entry 'test' for key 'idx' (1062)]") + } + + result := migrator.retryOperation(retryable, false) + assert.Error(t, result) + // Should only try once - no retries for warnings + assert.Equal(t, 1, tries, "Expected exactly 1 try (no retries) for warning error") + assert.Contains(t, result.Error(), "warnings detected") +} + +func TestMigratorRetryWithExponentialBackoffSkipsRetriesForWarnings(t *testing.T) { + oldRetrySleepFn := RetrySleepFn + defer func() { RetrySleepFn = oldRetrySleepFn }() + + migrationContext := base.NewMigrationContext() + migrationContext.SetDefaultNumRetries(100) + migrationContext.SetExponentialBackoffMaxInterval(42) + migrator := NewMigrator(migrationContext, "1.2.3") + + RetrySleepFn = func(duration time.Duration) { + t.Fatal("Should not sleep/retry for warning errors") + } + + var tries = 0 + retryable := func() error { + tries++ + return errors.New("warnings detected in statement 1 of 1: [Warning: Duplicate entry 'test' for key 'idx' (1062)]") + } + + result := migrator.retryOperationWithExponentialBackoff(retryable, false) + assert.Error(t, result) + // Should only try once - no retries for warnings + assert.Equal(t, 1, tries, "Expected exactly 1 try (no retries) for warning error") + assert.Contains(t, result.Error(), "warnings detected") +} + func (suite *MigratorTestSuite) TestCutOverLossDataCaseLockGhostBeforeRename() { ctx := context.Background() @@ -1210,3 +1322,102 @@ func TestCheckAbort_DetectsContextCancellation(t *testing.T) { t.Fatal("Expected checkAbort to return error when context is cancelled") } } + +func (suite *MigratorTestSuite) TestPanicOnWarningsDuplicateDuringCutoverWithHighRetries() { + ctx := context.Background() + + // Create table with email column (no unique constraint initially) + _, err := suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY AUTO_INCREMENT, email VARCHAR(100))", getTestTableName())) + suite.Require().NoError(err) + + // Insert initial rows with unique email values - passes pre-flight validation + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (email) VALUES ('user1@example.com')", getTestTableName())) + suite.Require().NoError(err) + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (email) VALUES ('user2@example.com')", getTestTableName())) + suite.Require().NoError(err) + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (email) VALUES ('user3@example.com')", getTestTableName())) + suite.Require().NoError(err) + + // Verify we have 3 rows + var count int + err = suite.db.QueryRowContext(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s", getTestTableName())).Scan(&count) + suite.Require().NoError(err) + suite.Require().Equal(3, count) + + // Create postpone flag file + tmpDir, err := os.MkdirTemp("", "gh-ost-postpone-test") + suite.Require().NoError(err) + defer os.RemoveAll(tmpDir) + postponeFlagFile := filepath.Join(tmpDir, "postpone.flag") + err = os.WriteFile(postponeFlagFile, []byte{}, 0644) + suite.Require().NoError(err) + + // Start migration in goroutine + done := make(chan error, 1) + go func() { + connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer) + if err != nil { + done <- err + return + } + + migrationContext := newTestMigrationContext() + migrationContext.ApplierConnectionConfig = connectionConfig + migrationContext.InspectorConnectionConfig = connectionConfig + migrationContext.SetConnectionConfig("innodb") + migrationContext.AlterStatementOptions = "ADD UNIQUE KEY unique_email_idx (email)" + migrationContext.HeartbeatIntervalMilliseconds = 100 + migrationContext.PostponeCutOverFlagFile = postponeFlagFile + migrationContext.PanicOnWarnings = true + + // High retry count + exponential backoff means retries will take a long time and fail the test if not properly aborted + migrationContext.SetDefaultNumRetries(30) + migrationContext.CutOverExponentialBackoff = true + migrationContext.SetExponentialBackoffMaxInterval(128) + + migrator := NewMigrator(migrationContext, "0.0.0") + + //nolint:contextcheck + done <- migrator.Migrate() + }() + + // Wait for migration to reach postponed state + // TODO replace this with an actual check for postponed state + time.Sleep(3 * time.Second) + + // Now insert a duplicate email value while migration is postponed + // This simulates data arriving during migration that would violate the unique constraint + _, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (email) VALUES ('user1@example.com')", getTestTableName())) + suite.Require().NoError(err) + + // Verify we now have 4 rows (including the duplicate) + err = suite.db.QueryRowContext(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s", getTestTableName())).Scan(&count) + suite.Require().NoError(err) + suite.Require().Equal(4, count) + + // Unpostpone the migration - gh-ost will now try to apply binlog events with the duplicate + err = os.Remove(postponeFlagFile) + suite.Require().NoError(err) + + // Wait for Migrate() to return - with timeout to detect if it hangs + select { + case migrateErr := <-done: + // Success - Migrate() returned + // It should return an error due to the duplicate + suite.Require().Error(migrateErr, "Expected migration to fail due to duplicate key violation") + suite.Require().Contains(migrateErr.Error(), "Duplicate entry", "Error should mention duplicate entry") + case <-time.After(5 * time.Minute): + suite.FailNow("Migrate() hung and did not return within 5 minutes - failure to abort on warnings in retry loop") + } + + // Verify all 4 rows are still in the original table (no silent data loss) + err = suite.db.QueryRowContext(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s", getTestTableName())).Scan(&count) + suite.Require().NoError(err) + suite.Require().Equal(4, count, "Original table should still have all 4 rows") + + // Verify both user1@example.com entries still exist + var duplicateCount int + err = suite.db.QueryRowContext(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %s WHERE email = 'user1@example.com'", getTestTableName())).Scan(&duplicateCount) + suite.Require().NoError(err) + suite.Require().Equal(2, duplicateCount, "Should have 2 duplicate email entries") +} diff --git a/localtests/panic-on-warnings-update-pk-with-duplicate-on-new-unique-index/expect_failure b/localtests/panic-on-warnings-update-pk-with-duplicate-on-new-unique-index/expect_failure index fb8dc562a..5a6e5411e 100644 --- a/localtests/panic-on-warnings-update-pk-with-duplicate-on-new-unique-index/expect_failure +++ b/localtests/panic-on-warnings-update-pk-with-duplicate-on-new-unique-index/expect_failure @@ -1 +1 @@ -ERROR warnings detected in statement 1 of 1 +ERROR warnings detected in statement