diff --git a/.github/workflows/api.yml b/.github/workflows/api.yml index beb99182..e8044d5c 100644 --- a/.github/workflows/api.yml +++ b/.github/workflows/api.yml @@ -37,6 +37,21 @@ jobs: - name: Wait for services to be healthy working-directory: ./tests run: | + echo "Waiting for sqld to be healthy..." + for i in $(seq 1 20); do + if curl -sf http://localhost:8090/health >/dev/null 2>&1; then + echo "sqld is healthy!" + break + fi + if [ $i -eq 20 ]; then + echo "sqld failed to become healthy" + docker compose logs sqld + exit 1 + fi + echo "sqld attempt $i/20 - waiting 3s..." + sleep 3 + done + echo "Waiting for API to be healthy..." for i in $(seq 1 40); do if docker compose exec api curl -sf http://localhost:8000/health >/dev/null 2>&1; then diff --git a/api/go.mod b/api/go.mod index 0c6ccd0b..d77c0bf6 100644 --- a/api/go.mod +++ b/api/go.mod @@ -44,6 +44,7 @@ require ( github.com/stretchr/testify v1.11.1 github.com/swaggo/swag v1.16.6 github.com/thedevsaddam/govalidator v1.9.10 + github.com/tursodatabase/libsql-client-go v0.0.0-20260514053736-a9a8fadfe885 github.com/uptrace/uptrace-go v1.43.0 github.com/xuri/excelize/v2 v2.10.1 go.opentelemetry.io/otel v1.43.0 @@ -93,11 +94,13 @@ require ( github.com/PuerkitoBio/goquery v1.12.0 // indirect github.com/andybalholm/brotli v1.2.1 // indirect github.com/andybalholm/cascadia v1.3.3 // indirect + github.com/antlr4-go/antlr/v4 v4.13.0 // indirect github.com/cenkalti/backoff/v5 v5.0.3 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/clipperhouse/displaywidth v0.11.0 // indirect github.com/clipperhouse/uax29/v2 v2.7.0 // indirect github.com/cncf/xds/go v0.0.0-20260202195803-dba9d589def2 // indirect + github.com/coder/websocket v1.8.12 // indirect github.com/envoyproxy/go-control-plane/envoy v1.37.0 // indirect github.com/envoyproxy/protoc-gen-validate v1.3.3 // indirect github.com/fatih/color v1.19.0 // indirect @@ -183,6 +186,7 @@ require ( go.uber.org/zap v1.28.0 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect golang.org/x/crypto v0.50.0 // indirect + golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 // indirect golang.org/x/mod v0.35.0 // indirect golang.org/x/net v0.53.0 // indirect golang.org/x/oauth2 v0.36.0 // indirect diff --git a/api/go.sum b/api/go.sum index 5af5496f..0710b094 100644 --- a/api/go.sum +++ b/api/go.sum @@ -66,6 +66,8 @@ github.com/andybalholm/brotli v1.2.1 h1:R+f5xP285VArJDRgowrfb9DqL18yVK0gKAW/F+eT github.com/andybalholm/brotli v1.2.1/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY= github.com/andybalholm/cascadia v1.3.3 h1:AG2YHrzJIm4BZ19iwJ/DAua6Btl3IwJX+VI4kktS1LM= github.com/andybalholm/cascadia v1.3.3/go.mod h1:xNd9bqTn98Ln4DwST8/nG+H0yuB8Hmgu1YHNnWw0GeA= +github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8TVTI= +github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g= github.com/avast/retry-go/v5 v5.0.0 h1:kf1Qc2UsTZ4qq8elDymqfbISvkyMuhgRxuJqX2NHP7k= github.com/avast/retry-go/v5 v5.0.0/go.mod h1://d+usmKWio1agtZfS1H/ltTqwtIfBnRq9zEwjc3eH8= github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= @@ -88,6 +90,8 @@ github.com/cncf/xds/go v0.0.0-20260202195803-dba9d589def2 h1:aBangftG7EVZoUb69Os github.com/cncf/xds/go v0.0.0-20260202195803-dba9d589def2/go.mod h1:qwXFYgsP6T7XnJtbKlf1HP8AjxZZyzxMmc+Lq5GjlU4= github.com/cockroachdb/cockroach-go/v2 v2.4.3 h1:LJO3K3jC5WXvMePRQSJE1NsIGoFGcEx1LW83W6RAlhw= github.com/cockroachdb/cockroach-go/v2 v2.4.3/go.mod h1:9U179XbCx4qFWtNhc7BiWLPfuyMVQ7qdAhfrwLz1vH0= +github.com/coder/websocket v1.8.12 h1:5bUXkEPPIbewrnkU8LTCLVaxi4N4J8ahufH2vlo4NAo= +github.com/coder/websocket v1.8.12/go.mod h1:LNVeNrXQZfe5qhS9ALED3uA+l5pPqvwXg3CKoDBB2gs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= @@ -320,6 +324,8 @@ github.com/thedevsaddam/govalidator v1.9.10 h1:m3dLRbSZ5Hts3VUWYe+vxLMG+FdyQuWOj github.com/thedevsaddam/govalidator v1.9.10/go.mod h1:Ilx8u7cg5g3LXbSS943cx5kczyNuUn7LH/cK5MYuE90= github.com/tiendc/go-deepcopy v1.7.2 h1:Ut2yYR7W9tWjTQitganoIue4UGxZwCcJy3orjrrIj44= github.com/tiendc/go-deepcopy v1.7.2/go.mod h1:4bKjNC2r7boYOkD2IOuZpYjmlDdzjbpTRyCx+goBCJQ= +github.com/tursodatabase/libsql-client-go v0.0.0-20260514053736-a9a8fadfe885 h1:YssVXwM/9nUAjGNmUWdgvb05JVcsaBrDn5yr+MaJTn0= +github.com/tursodatabase/libsql-client-go v0.0.0-20260514053736-a9a8fadfe885/go.mod h1:08inkKyguB6CGGssc/JzhmQWwBgFQBgjlYFjxjRh7nU= github.com/uptrace/uptrace-go v1.43.0 h1:5QuCdyFJdWUEXx6Fr6sYfezdgO6n6lnkOvUTLlyQO7U= github.com/uptrace/uptrace-go v1.43.0/go.mod h1:ehDTIdtBSolg4Z0CCvg1C8yR6VX1YFDqBcg2KmsXWn0= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= @@ -410,6 +416,8 @@ golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/crypto v0.50.0 h1:zO47/JPrL6vsNkINmLoo/PH1gcxpls50DNogFvB5ZGI= golang.org/x/crypto v0.50.0/go.mod h1:3muZ7vA7PBCE6xgPX7nkzzjiUq87kRItoJQM1Yo8S+Q= +golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 h1:aAcj0Da7eBAtrTp03QXWvm88pSyOt+UgdZw2BFZ+lEw= +golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8/go.mod h1:CQ1k9gNrJ50XIzaKCRR2hssIjF07kZFEiieALBM/ARQ= golang.org/x/image v0.25.0 h1:Y6uW6rH1y5y/LK1J8BPWZtr6yZ7hrsy6hFrXjgsc2fQ= golang.org/x/image v0.25.0/go.mod h1:tCAmOEGthTtkalusGp1g3xa2gke8J6c2N565dTyl9Rs= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= diff --git a/api/pkg/di/container.go b/api/pkg/di/container.go index 2ecfa04d..ce6a82e2 100644 --- a/api/pkg/di/container.go +++ b/api/pkg/di/container.go @@ -3,6 +3,7 @@ package di import ( "context" "crypto/tls" + "database/sql" "fmt" "net/http" "os" @@ -82,6 +83,7 @@ type Container struct { projectID string db *gorm.DB dedicatedDB *gorm.DB + tursoDB *sql.DB version string app *fiber.App eventDispatcher *services.EventDispatcher @@ -269,19 +271,16 @@ func (container *Container) DedicatedDB() (db *gorm.DB) { container.logger.Fatal(err) } - sqlDB, err := db.DB() - if err != nil { - container.logger.Fatal(stacktrace.Propagate(err, "cannot get sql.DB from GORM")) - } - - sqlDB.SetMaxOpenConns(1) - sqlDB.SetMaxIdleConns(0) - sqlDB.SetConnMaxLifetime(10 * time.Second) - if err = db.Use(tracing.NewPlugin()); err != nil { container.logger.Fatal(stacktrace.Propagate(err, "cannot use GORM tracing plugin")) } + container.dedicatedDB = db + if os.Getenv("DATABASE_MIGRATION_SKIP") != "" { + container.logger.Debug(fmt.Sprintf("skipping migrations for [%T]", db)) + return container.dedicatedDB + } + container.logger.Debug(fmt.Sprintf("Running migrations for dedicated [%T]", db)) if err = db.AutoMigrate(&entities.Heartbeat{}); err != nil { container.logger.Fatal(stacktrace.Propagate(err, fmt.Sprintf("cannot migrate %T", &entities.Heartbeat{}))) @@ -291,10 +290,43 @@ func (container *Container) DedicatedDB() (db *gorm.DB) { container.logger.Fatal(stacktrace.Propagate(err, fmt.Sprintf("cannot migrate %T", &entities.HeartbeatMonitor{}))) } - container.dedicatedDB = db return container.dedicatedDB } +// TursoDB creates a *sql.DB connection to a Turso/libSQL database +func (container *Container) TursoDB() *sql.DB { + if container.tursoDB != nil { + return container.tursoDB + } + + container.logger.Debug("creating Turso *sql.DB connection") + + db, err := repositories.NewTursoDB(os.Getenv("TURSO_DATABASE_DSN")) + if err != nil { + container.logger.Fatal(err) + } + + container.tursoDB = db + return container.tursoDB +} + +// HedgingFailureCounter creates an OTel counter for hedging secondary write failures +func (container *Container) HedgingFailureCounter() otelMetric.Int64Counter { + meter := otel.GetMeterProvider().Meter( + container.projectID, + otelMetric.WithInstrumentationVersion(otel.Version()), + ) + counter, err := meter.Int64Counter( + "hedging.secondary.write.failures", + otelMetric.WithUnit("1"), + otelMetric.WithDescription("Number of failed secondary writes in hedging repositories"), + ) + if err != nil { + container.logger.Fatal(stacktrace.Propagate(err, "cannot create hedging failure counter")) + } + return counter +} + // DBWithoutMigration creates an instance of gorm.DB if it has not been created already func (container *Container) DBWithoutMigration() (db *gorm.DB) { if container.db != nil { @@ -889,12 +921,31 @@ func (container *Container) MessageThreadRepository() (repository repositories.M // HeartbeatMonitorRepository creates a new instance of repositories.HeartbeatMonitorRepository func (container *Container) HeartbeatMonitorRepository() (repository repositories.HeartbeatMonitorRepository) { - container.logger.Debug("creating GORM repositories.HeartbeatMonitorRepository") - return repositories.NewGormHeartbeatMonitorRepository( - container.Logger(), - container.Tracer(), - container.DedicatedDB(), - ) + switch os.Getenv("HEARTBEAT_DB_BACKEND") { + case "turso": + container.logger.Debug("creating libSQL repositories.HeartbeatMonitorRepository") + return repositories.NewLibsqlHeartbeatMonitorRepository( + container.Logger(), + container.Tracer(), + container.TursoDB(), + ) + case "hedging": + container.logger.Debug("creating hedging repositories.HeartbeatMonitorRepository") + return repositories.NewHedgingHeartbeatMonitorRepository( + container.Logger(), + container.Tracer(), + repositories.NewGormHeartbeatMonitorRepository(container.Logger(), container.Tracer(), container.DedicatedDB()), + repositories.NewLibsqlHeartbeatMonitorRepository(container.Logger(), container.Tracer(), container.TursoDB()), + container.HedgingFailureCounter(), + ) + default: + container.logger.Debug("creating GORM repositories.HeartbeatMonitorRepository") + return repositories.NewGormHeartbeatMonitorRepository( + container.Logger(), + container.Tracer(), + container.DedicatedDB(), + ) + } } // HeartbeatService creates a new instance of services.HeartbeatService @@ -1708,12 +1759,31 @@ func (container *Container) RegisterSwaggerRoutes() { // HeartbeatRepository registers a new instance of repositories.HeartbeatRepository func (container *Container) HeartbeatRepository() repositories.HeartbeatRepository { - container.logger.Debug("creating GORM repositories.HeartbeatRepository") - return repositories.NewGormHeartbeatRepository( - container.Logger(), - container.Tracer(), - container.DedicatedDB(), - ) + switch os.Getenv("HEARTBEAT_DB_BACKEND") { + case "turso": + container.logger.Debug("creating libSQL repositories.HeartbeatRepository") + return repositories.NewLibsqlHeartbeatRepository( + container.Logger(), + container.Tracer(), + container.TursoDB(), + ) + case "hedging": + container.logger.Debug("creating hedging repositories.HeartbeatRepository") + return repositories.NewHedgingHeartbeatRepository( + container.Logger(), + container.Tracer(), + repositories.NewGormHeartbeatRepository(container.Logger(), container.Tracer(), container.DedicatedDB()), + repositories.NewLibsqlHeartbeatRepository(container.Logger(), container.Tracer(), container.TursoDB()), + container.HedgingFailureCounter(), + ) + default: + container.logger.Debug("creating GORM repositories.HeartbeatRepository") + return repositories.NewGormHeartbeatRepository( + container.Logger(), + container.Tracer(), + container.DedicatedDB(), + ) + } } // UserRepository registers a new instance of repositories.UserRepository diff --git a/api/pkg/repositories/hedging_heartbeat_monitor_repository.go b/api/pkg/repositories/hedging_heartbeat_monitor_repository.go new file mode 100644 index 00000000..3304230c --- /dev/null +++ b/api/pkg/repositories/hedging_heartbeat_monitor_repository.go @@ -0,0 +1,128 @@ +package repositories + +import ( + "context" + "fmt" + + "github.com/google/uuid" + otelMetric "go.opentelemetry.io/otel/metric" + + "github.com/NdoleStudio/httpsms/pkg/entities" + "github.com/NdoleStudio/httpsms/pkg/telemetry" + "github.com/palantir/stacktrace" +) + +// hedgingHeartbeatMonitorRepository writes to both primary and secondary repositories. +// Reads only hit primary. Secondary writes are fail-open. +type hedgingHeartbeatMonitorRepository struct { + logger telemetry.Logger + tracer telemetry.Tracer + primary HeartbeatMonitorRepository + secondary HeartbeatMonitorRepository + failureCounter otelMetric.Int64Counter +} + +// NewHedgingHeartbeatMonitorRepository creates a hedging HeartbeatMonitorRepository +func NewHedgingHeartbeatMonitorRepository( + logger telemetry.Logger, + tracer telemetry.Tracer, + primary HeartbeatMonitorRepository, + secondary HeartbeatMonitorRepository, + failureCounter otelMetric.Int64Counter, +) HeartbeatMonitorRepository { + return &hedgingHeartbeatMonitorRepository{ + logger: logger.WithService(fmt.Sprintf("%T", &hedgingHeartbeatMonitorRepository{})), + tracer: tracer, + primary: primary, + secondary: secondary, + failureCounter: failureCounter, + } +} + +func (repository *hedgingHeartbeatMonitorRepository) Store(ctx context.Context, monitor *entities.HeartbeatMonitor) error { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + if err := repository.primary.Store(ctx, monitor); err != nil { + return err + } + + if err := repository.secondary.Store(ctx, monitor); err != nil { + repository.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("hedging: secondary write failed for monitor [%s]", monitor.ID))) + repository.failureCounter.Add(ctx, 1) + } + + return nil +} + +func (repository *hedgingHeartbeatMonitorRepository) Load(ctx context.Context, userID entities.UserID, phoneNumber string) (*entities.HeartbeatMonitor, error) { + return repository.primary.Load(ctx, userID, phoneNumber) +} + +func (repository *hedgingHeartbeatMonitorRepository) Exists(ctx context.Context, userID entities.UserID, monitorID uuid.UUID) (bool, error) { + return repository.primary.Exists(ctx, userID, monitorID) +} + +func (repository *hedgingHeartbeatMonitorRepository) UpdateQueueID(ctx context.Context, monitorID uuid.UUID, queueID string) error { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + if err := repository.primary.UpdateQueueID(ctx, monitorID, queueID); err != nil { + return err + } + + if err := repository.secondary.UpdateQueueID(ctx, monitorID, queueID); err != nil { + repository.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("hedging: secondary UpdateQueueID failed for monitor [%s]", monitorID))) + repository.failureCounter.Add(ctx, 1) + } + + return nil +} + +func (repository *hedgingHeartbeatMonitorRepository) Delete(ctx context.Context, userID entities.UserID, phoneNumber string) error { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + if err := repository.primary.Delete(ctx, userID, phoneNumber); err != nil { + return err + } + + if err := repository.secondary.Delete(ctx, userID, phoneNumber); err != nil { + repository.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("hedging: secondary delete failed for monitor with owner [%s]", phoneNumber))) + repository.failureCounter.Add(ctx, 1) + } + + return nil +} + +func (repository *hedgingHeartbeatMonitorRepository) UpdatePhoneOnline(ctx context.Context, userID entities.UserID, monitorID uuid.UUID, online bool) error { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + if err := repository.primary.UpdatePhoneOnline(ctx, userID, monitorID, online); err != nil { + return err + } + + if err := repository.secondary.UpdatePhoneOnline(ctx, userID, monitorID, online); err != nil { + repository.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("hedging: secondary UpdatePhoneOnline failed for monitor [%s]", monitorID))) + repository.failureCounter.Add(ctx, 1) + } + + return nil +} + +func (repository *hedgingHeartbeatMonitorRepository) DeleteAllForUser(ctx context.Context, userID entities.UserID) error { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + if err := repository.primary.DeleteAllForUser(ctx, userID); err != nil { + return err + } + + if err := repository.secondary.DeleteAllForUser(ctx, userID); err != nil { + repository.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("hedging: secondary delete all failed for user [%s]", userID))) + repository.failureCounter.Add(ctx, 1) + } + + return nil +} diff --git a/api/pkg/repositories/hedging_heartbeat_repository.go b/api/pkg/repositories/hedging_heartbeat_repository.go new file mode 100644 index 00000000..07346264 --- /dev/null +++ b/api/pkg/repositories/hedging_heartbeat_repository.go @@ -0,0 +1,79 @@ +package repositories + +import ( + "context" + "fmt" + + otelMetric "go.opentelemetry.io/otel/metric" + + "github.com/NdoleStudio/httpsms/pkg/entities" + "github.com/NdoleStudio/httpsms/pkg/telemetry" + "github.com/palantir/stacktrace" +) + +// hedgingHeartbeatRepository writes to both primary and secondary repositories. +// Reads only hit primary. Secondary writes are fail-open. +type hedgingHeartbeatRepository struct { + logger telemetry.Logger + tracer telemetry.Tracer + primary HeartbeatRepository + secondary HeartbeatRepository + failureCounter otelMetric.Int64Counter +} + +// NewHedgingHeartbeatRepository creates a hedging HeartbeatRepository +func NewHedgingHeartbeatRepository( + logger telemetry.Logger, + tracer telemetry.Tracer, + primary HeartbeatRepository, + secondary HeartbeatRepository, + failureCounter otelMetric.Int64Counter, +) HeartbeatRepository { + return &hedgingHeartbeatRepository{ + logger: logger.WithService(fmt.Sprintf("%T", &hedgingHeartbeatRepository{})), + tracer: tracer, + primary: primary, + secondary: secondary, + failureCounter: failureCounter, + } +} + +func (repository *hedgingHeartbeatRepository) Store(ctx context.Context, heartbeat *entities.Heartbeat) error { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + if err := repository.primary.Store(ctx, heartbeat); err != nil { + return err + } + + if err := repository.secondary.Store(ctx, heartbeat); err != nil { + repository.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("hedging: secondary write failed for heartbeat [%s]", heartbeat.ID))) + repository.failureCounter.Add(ctx, 1) + } + + return nil +} + +func (repository *hedgingHeartbeatRepository) Index(ctx context.Context, userID entities.UserID, owner string, params IndexParams) (*[]entities.Heartbeat, error) { + return repository.primary.Index(ctx, userID, owner, params) +} + +func (repository *hedgingHeartbeatRepository) Last(ctx context.Context, userID entities.UserID, owner string) (*entities.Heartbeat, error) { + return repository.primary.Last(ctx, userID, owner) +} + +func (repository *hedgingHeartbeatRepository) DeleteAllForUser(ctx context.Context, userID entities.UserID) error { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + if err := repository.primary.DeleteAllForUser(ctx, userID); err != nil { + return err + } + + if err := repository.secondary.DeleteAllForUser(ctx, userID); err != nil { + repository.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("hedging: secondary delete failed for user [%s]", userID))) + repository.failureCounter.Add(ctx, 1) + } + + return nil +} diff --git a/api/pkg/repositories/libsql.go b/api/pkg/repositories/libsql.go new file mode 100644 index 00000000..66ee8c0e --- /dev/null +++ b/api/pkg/repositories/libsql.go @@ -0,0 +1,67 @@ +package repositories + +import ( + "database/sql" + "fmt" + + _ "github.com/tursodatabase/libsql-client-go/libsql" // libSQL database driver + + "github.com/palantir/stacktrace" +) + +const ( + tableHeartbeats = "heartbeats" + tableHeartbeatMonitors = "heartbeat_monitors" +) + +// NewTursoDB creates a new *sql.DB connection to a Turso database and auto-creates tables +func NewTursoDB(dsn string) (*sql.DB, error) { + db, err := sql.Open("libsql", dsn) + if err != nil { + return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot open turso database with DSN [%s]", dsn)) + } + + if err = db.Ping(); err != nil { + return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot ping turso database with DSN [%s]", dsn)) + } + + if err = createTursoTables(db); err != nil { + return nil, stacktrace.Propagate(err, "cannot create turso tables") + } + + return db, nil +} + +func createTursoTables(db *sql.DB) error { + statements := []string{ + `CREATE TABLE IF NOT EXISTS ` + tableHeartbeats + ` ( + id TEXT PRIMARY KEY, + owner TEXT NOT NULL, + version TEXT NOT NULL, + charging INTEGER NOT NULL DEFAULT 0, + user_id TEXT NOT NULL, + timestamp DATETIME NOT NULL + )`, + `CREATE INDEX IF NOT EXISTS idx_heartbeats_owner_timestamp ON ` + tableHeartbeats + `(owner, timestamp)`, + `CREATE INDEX IF NOT EXISTS idx_heartbeats_user_id ON ` + tableHeartbeats + `(user_id)`, + `CREATE TABLE IF NOT EXISTS ` + tableHeartbeatMonitors + ` ( + id TEXT PRIMARY KEY, + phone_id TEXT NOT NULL, + user_id TEXT NOT NULL, + queue_id TEXT NOT NULL DEFAULT '', + owner TEXT NOT NULL, + phone_online INTEGER NOT NULL DEFAULT 1, + created_at DATETIME NOT NULL, + updated_at DATETIME NOT NULL + )`, + `CREATE INDEX IF NOT EXISTS idx_heartbeat_monitors_user_owner ON ` + tableHeartbeatMonitors + `(user_id, owner)`, + } + + for _, stmt := range statements { + if _, err := db.Exec(stmt); err != nil { + return stacktrace.Propagate(err, fmt.Sprintf("cannot execute statement: %s", stmt)) + } + } + + return nil +} diff --git a/api/pkg/repositories/libsql_heartbeat_monitor_repository.go b/api/pkg/repositories/libsql_heartbeat_monitor_repository.go new file mode 100644 index 00000000..0cb594af --- /dev/null +++ b/api/pkg/repositories/libsql_heartbeat_monitor_repository.go @@ -0,0 +1,199 @@ +package repositories + +import ( + "context" + "database/sql" + "fmt" + "time" + + "github.com/google/uuid" + + "github.com/NdoleStudio/httpsms/pkg/entities" + "github.com/NdoleStudio/httpsms/pkg/telemetry" + "github.com/palantir/stacktrace" +) + +// libsqlHeartbeatMonitorRepository is responsible for persisting entities.HeartbeatMonitor in Turso/libSQL +type libsqlHeartbeatMonitorRepository struct { + logger telemetry.Logger + tracer telemetry.Tracer + db *sql.DB +} + +// NewLibsqlHeartbeatMonitorRepository creates the libSQL version of the HeartbeatMonitorRepository +func NewLibsqlHeartbeatMonitorRepository( + logger telemetry.Logger, + tracer telemetry.Tracer, + db *sql.DB, +) HeartbeatMonitorRepository { + return &libsqlHeartbeatMonitorRepository{ + logger: logger.WithService(fmt.Sprintf("%T", &libsqlHeartbeatMonitorRepository{})), + tracer: tracer, + db: db, + } +} + +func (repository *libsqlHeartbeatMonitorRepository) Store(ctx context.Context, monitor *entities.HeartbeatMonitor) error { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) + defer cancel() + + _, err := repository.db.ExecContext(ctx, + "INSERT INTO "+tableHeartbeatMonitors+" (id, phone_id, user_id, queue_id, owner, phone_online, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + monitor.ID.String(), + monitor.PhoneID.String(), + string(monitor.UserID), + monitor.QueueID, + monitor.Owner, + boolToInt(monitor.PhoneOnline), + monitor.CreatedAt.UTC(), + monitor.UpdatedAt.UTC(), + ) + if err != nil { + msg := fmt.Sprintf("cannot save heartbeat monitor with ID [%s]", monitor.ID) + return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + + return nil +} + +func (repository *libsqlHeartbeatMonitorRepository) Load(ctx context.Context, userID entities.UserID, phoneNumber string) (*entities.HeartbeatMonitor, error) { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) + defer cancel() + + row := repository.db.QueryRowContext(ctx, + "SELECT id, phone_id, user_id, queue_id, owner, phone_online, created_at, updated_at FROM "+tableHeartbeatMonitors+" WHERE user_id = ? AND owner = ? LIMIT 1", + string(userID), phoneNumber, + ) + + monitor, err := repository.scanHeartbeatMonitorRow(row) + if err == sql.ErrNoRows { + msg := fmt.Sprintf("heartbeat monitor with userID [%s] and owner [%s] does not exist", userID, phoneNumber) + return nil, repository.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, ErrCodeNotFound, msg)) + } + if err != nil { + msg := fmt.Sprintf("cannot load heartbeat monitor with userID [%s] and owner [%s]", userID, phoneNumber) + return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + + return monitor, nil +} + +func (repository *libsqlHeartbeatMonitorRepository) Exists(ctx context.Context, userID entities.UserID, monitorID uuid.UUID) (bool, error) { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) + defer cancel() + + var count int + err := repository.db.QueryRowContext(ctx, + "SELECT COUNT(*) FROM "+tableHeartbeatMonitors+" WHERE user_id = ? AND id = ?", + string(userID), monitorID.String(), + ).Scan(&count) + if err != nil { + msg := fmt.Sprintf("cannot check if heartbeat monitor exists with userID [%s] and monitor ID [%s]", userID, monitorID) + return false, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + + return count > 0, nil +} + +func (repository *libsqlHeartbeatMonitorRepository) UpdateQueueID(ctx context.Context, monitorID uuid.UUID, queueID string) error { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) + defer cancel() + + _, err := repository.db.ExecContext(ctx, + "UPDATE "+tableHeartbeatMonitors+" SET queue_id = ?, updated_at = ? WHERE id = ?", + queueID, time.Now().UTC(), monitorID.String(), + ) + if err != nil { + msg := fmt.Sprintf("cannot update heartbeat monitor ID [%s]", monitorID) + return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + + return nil +} + +func (repository *libsqlHeartbeatMonitorRepository) Delete(ctx context.Context, userID entities.UserID, phoneNumber string) error { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) + defer cancel() + + _, err := repository.db.ExecContext(ctx, + "DELETE FROM "+tableHeartbeatMonitors+" WHERE user_id = ? AND owner = ?", + string(userID), phoneNumber, + ) + if err != nil { + msg := fmt.Sprintf("cannot delete heartbeat monitor with owner [%s] and userID [%s]", phoneNumber, userID) + return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + + return nil +} + +func (repository *libsqlHeartbeatMonitorRepository) UpdatePhoneOnline(ctx context.Context, userID entities.UserID, monitorID uuid.UUID, online bool) error { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) + defer cancel() + + _, err := repository.db.ExecContext(ctx, + "UPDATE "+tableHeartbeatMonitors+" SET phone_online = ?, updated_at = ? WHERE id = ? AND user_id = ?", + boolToInt(online), time.Now().UTC(), monitorID.String(), string(userID), + ) + if err != nil { + msg := fmt.Sprintf("cannot update heartbeat monitor ID [%s] for user [%s]", monitorID, userID) + return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + + return nil +} + +func (repository *libsqlHeartbeatMonitorRepository) DeleteAllForUser(ctx context.Context, userID entities.UserID) error { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) + defer cancel() + + _, err := repository.db.ExecContext(ctx, "DELETE FROM "+tableHeartbeatMonitors+" WHERE user_id = ?", string(userID)) + if err != nil { + msg := fmt.Sprintf("cannot delete all [%T] for user with ID [%s]", &entities.HeartbeatMonitor{}, userID) + return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + + return nil +} + +func (repository *libsqlHeartbeatMonitorRepository) scanHeartbeatMonitorRow(row *sql.Row) (*entities.HeartbeatMonitor, error) { + monitor := new(entities.HeartbeatMonitor) + var id, phoneID, userID string + var phoneOnline int + err := row.Scan(&id, &phoneID, &userID, &monitor.QueueID, &monitor.Owner, &phoneOnline, &monitor.CreatedAt, &monitor.UpdatedAt) + if err != nil { + return nil, err + } + monitor.ID, err = uuid.Parse(id) + if err != nil { + return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot parse heartbeat monitor ID [%s]", id)) + } + monitor.PhoneID, err = uuid.Parse(phoneID) + if err != nil { + return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot parse heartbeat monitor phone ID [%s]", phoneID)) + } + monitor.UserID = entities.UserID(userID) + monitor.PhoneOnline = phoneOnline != 0 + return monitor, nil +} diff --git a/api/pkg/repositories/libsql_heartbeat_repository.go b/api/pkg/repositories/libsql_heartbeat_repository.go new file mode 100644 index 00000000..42fdf911 --- /dev/null +++ b/api/pkg/repositories/libsql_heartbeat_repository.go @@ -0,0 +1,186 @@ +package repositories + +import ( + "context" + "database/sql" + "fmt" + + "github.com/google/uuid" + + "github.com/NdoleStudio/httpsms/pkg/entities" + "github.com/NdoleStudio/httpsms/pkg/telemetry" + "github.com/palantir/stacktrace" +) + +// libsqlHeartbeatRepository is responsible for persisting entities.Heartbeat in Turso/libSQL +type libsqlHeartbeatRepository struct { + logger telemetry.Logger + tracer telemetry.Tracer + db *sql.DB +} + +// NewLibsqlHeartbeatRepository creates the libSQL version of the HeartbeatRepository +func NewLibsqlHeartbeatRepository( + logger telemetry.Logger, + tracer telemetry.Tracer, + db *sql.DB, +) HeartbeatRepository { + return &libsqlHeartbeatRepository{ + logger: logger.WithService(fmt.Sprintf("%T", &libsqlHeartbeatRepository{})), + tracer: tracer, + db: db, + } +} + +func (repository *libsqlHeartbeatRepository) Store(ctx context.Context, heartbeat *entities.Heartbeat) error { + ctx, span, _ := repository.tracer.StartWithLogger(ctx, repository.logger) + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) + defer cancel() + + _, err := repository.db.ExecContext(ctx, + "INSERT INTO "+tableHeartbeats+" (id, owner, version, charging, user_id, timestamp) VALUES (?, ?, ?, ?, ?, ?)", + heartbeat.ID.String(), + heartbeat.Owner, + heartbeat.Version, + boolToInt(heartbeat.Charging), + string(heartbeat.UserID), + heartbeat.Timestamp.UTC(), + ) + if err != nil { + msg := fmt.Sprintf("cannot save heartbeat with ID [%s]", heartbeat.ID) + return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + + return nil +} + +func (repository *libsqlHeartbeatRepository) Index(ctx context.Context, userID entities.UserID, owner string, params IndexParams) (*[]entities.Heartbeat, error) { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) + defer cancel() + + var rows *sql.Rows + var err error + + if len(params.Query) > 0 { + queryPattern := "%" + params.Query + "%" + rows, err = repository.db.QueryContext(ctx, + "SELECT id, owner, version, charging, user_id, timestamp FROM "+tableHeartbeats+" WHERE user_id = ? AND owner = ? AND version LIKE ? ORDER BY timestamp DESC LIMIT ? OFFSET ?", + string(userID), owner, queryPattern, params.Limit, params.Skip, + ) + } else { + rows, err = repository.db.QueryContext(ctx, + "SELECT id, owner, version, charging, user_id, timestamp FROM "+tableHeartbeats+" WHERE user_id = ? AND owner = ? ORDER BY timestamp DESC LIMIT ? OFFSET ?", + string(userID), owner, params.Limit, params.Skip, + ) + } + if err != nil { + msg := fmt.Sprintf("cannot fetch heartbeats with owner [%s] and params [%+#v]", owner, params) + return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + defer rows.Close() + + heartbeats := make([]entities.Heartbeat, 0) + for rows.Next() { + heartbeat, scanErr := repository.scanHeartbeat(rows) + if scanErr != nil { + msg := fmt.Sprintf("cannot scan heartbeat row for owner [%s]", owner) + return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(scanErr, msg)) + } + heartbeats = append(heartbeats, *heartbeat) + } + if rowsErr := rows.Err(); rowsErr != nil { + msg := fmt.Sprintf("error iterating heartbeat rows for owner [%s]", owner) + return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(rowsErr, msg)) + } + + return &heartbeats, nil +} + +func (repository *libsqlHeartbeatRepository) Last(ctx context.Context, userID entities.UserID, owner string) (*entities.Heartbeat, error) { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) + defer cancel() + + row := repository.db.QueryRowContext(ctx, + "SELECT id, owner, version, charging, user_id, timestamp FROM "+tableHeartbeats+" WHERE user_id = ? AND owner = ? ORDER BY timestamp DESC LIMIT 1", + string(userID), owner, + ) + + heartbeat, err := repository.scanHeartbeatRow(row) + if err == sql.ErrNoRows { + msg := fmt.Sprintf("heartbeat with userID [%s] and owner [%s] does not exist", userID, owner) + return nil, repository.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, ErrCodeNotFound, msg)) + } + if err != nil { + msg := fmt.Sprintf("cannot load heartbeat with userID [%s] and owner [%s]", userID, owner) + return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + + return heartbeat, nil +} + +func (repository *libsqlHeartbeatRepository) DeleteAllForUser(ctx context.Context, userID entities.UserID) error { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) + defer cancel() + + _, err := repository.db.ExecContext(ctx, "DELETE FROM "+tableHeartbeats+" WHERE user_id = ?", string(userID)) + if err != nil { + msg := fmt.Sprintf("cannot delete all [%T] for user with ID [%s]", &entities.Heartbeat{}, userID) + return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + + return nil +} + +func (repository *libsqlHeartbeatRepository) scanHeartbeat(rows *sql.Rows) (*entities.Heartbeat, error) { + heartbeat := new(entities.Heartbeat) + var id string + var charging int + var userID string + err := rows.Scan(&id, &heartbeat.Owner, &heartbeat.Version, &charging, &userID, &heartbeat.Timestamp) + if err != nil { + return nil, err + } + heartbeat.ID, err = uuid.Parse(id) + if err != nil { + return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot parse heartbeat ID [%s]", id)) + } + heartbeat.Charging = charging != 0 + heartbeat.UserID = entities.UserID(userID) + return heartbeat, nil +} + +func (repository *libsqlHeartbeatRepository) scanHeartbeatRow(row *sql.Row) (*entities.Heartbeat, error) { + heartbeat := new(entities.Heartbeat) + var id string + var charging int + var userID string + err := row.Scan(&id, &heartbeat.Owner, &heartbeat.Version, &charging, &userID, &heartbeat.Timestamp) + if err != nil { + return nil, err + } + heartbeat.ID, err = uuid.Parse(id) + if err != nil { + return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot parse heartbeat ID [%s]", id)) + } + heartbeat.Charging = charging != 0 + heartbeat.UserID = entities.UserID(userID) + return heartbeat, nil +} + +func boolToInt(b bool) int { + if b { + return 1 + } + return 0 +} diff --git a/docs/superpowers/plans/2026-05-15-hedging-repository.md b/docs/superpowers/plans/2026-05-15-hedging-repository.md new file mode 100644 index 00000000..e3b838fa --- /dev/null +++ b/docs/superpowers/plans/2026-05-15-hedging-repository.md @@ -0,0 +1,265 @@ +# Hedging Repository Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +> **Status:** All tasks implemented. This plan was written retroactively to document and verify the implementation. + +**Goal:** Create composite hedging repositories that dual-write to GORM (primary) and Turso (secondary) with fail-open semantics on secondary failures. + +**Architecture:** Two new repository files implement the existing `HeartbeatRepository` and `HeartbeatMonitorRepository` interfaces by delegating reads to primary only and writes to both. Secondary write failures are logged and counted via an OTel metric but never propagated. Activated via `HEARTBEAT_DB_BACKEND=hedging`. + +**Tech Stack:** Go, OpenTelemetry metrics (`go.opentelemetry.io/otel/metric`), existing repository interfaces + +**Spec:** `docs/superpowers/specs/2026-05-15-hedging-repository-design.md` + +--- + +### Task 1: Create hedging heartbeat repository ✅ + +**Files:** + +- Created: `api/pkg/repositories/hedging_heartbeat_repository.go` + +- [ ] **Step 1: Create the hedging heartbeat repository file** + +```go +package repositories + +import ( + "context" + "fmt" + + otelMetric "go.opentelemetry.io/otel/metric" + + "github.com/NdoleStudio/httpsms/pkg/entities" + "github.com/NdoleStudio/httpsms/pkg/telemetry" + "github.com/palantir/stacktrace" +) + +type hedgingHeartbeatRepository struct { + logger telemetry.Logger + tracer telemetry.Tracer + primary HeartbeatRepository + secondary HeartbeatRepository + failureCounter otelMetric.Int64Counter +} + +func NewHedgingHeartbeatRepository( + logger telemetry.Logger, + tracer telemetry.Tracer, + primary HeartbeatRepository, + secondary HeartbeatRepository, + failureCounter otelMetric.Int64Counter, +) HeartbeatRepository { + return &hedgingHeartbeatRepository{ + logger: logger.WithService(fmt.Sprintf("%T", &hedgingHeartbeatRepository{})), + tracer: tracer, + primary: primary, + secondary: secondary, + failureCounter: failureCounter, + } +} +``` + +Implement 4 methods: + +- `Store` — write to primary, then secondary (fail-open with log + metric) +- `Index` — delegate to primary only +- `Last` — delegate to primary only +- `DeleteAllForUser` — write to primary, then secondary (fail-open with log + metric) + +Write methods follow this pattern: + +```go +func (r *hedgingHeartbeatRepository) Store(ctx context.Context, heartbeat *entities.Heartbeat) error { + ctx, span := r.tracer.Start(ctx) + defer span.End() + + if err := r.primary.Store(ctx, heartbeat); err != nil { + return err + } + + if err := r.secondary.Store(ctx, heartbeat); err != nil { + r.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("hedging: secondary write failed for heartbeat [%s]", heartbeat.ID))) + r.failureCounter.Add(ctx, 1) + } + + return nil +} +``` + +Read methods simply delegate: + +```go +func (r *hedgingHeartbeatRepository) Index(ctx context.Context, userID entities.UserID, owner string, params IndexParams) (*[]entities.Heartbeat, error) { + return r.primary.Index(ctx, userID, owner, params) +} +``` + +- [ ] **Step 2: Verify build** + +Run: `cd api && go build ./...` +Expected: exit code 0 + +- [ ] **Step 3: Commit** + +```bash +git add api/pkg/repositories/hedging_heartbeat_repository.go +git commit -m "feat(api): add hedging heartbeat repository" +``` + +--- + +### Task 2: Create hedging heartbeat monitor repository ✅ + +**Files:** + +- Created: `api/pkg/repositories/hedging_heartbeat_monitor_repository.go` + +- [ ] **Step 1: Create the hedging heartbeat monitor repository file** + +Same struct pattern as Task 1 but wrapping `HeartbeatMonitorRepository` interface. + +Implement 7 methods: + +| Method | Behavior | +| ------------------- | ---------------------- | +| `Store` | Write both (fail-open) | +| `Load` | Primary only | +| `Exists` | Primary only | +| `UpdateQueueID` | Write both (fail-open) | +| `Delete` | Write both (fail-open) | +| `UpdatePhoneOnline` | Write both (fail-open) | +| `DeleteAllForUser` | Write both (fail-open) | + +All write methods follow the same fail-open pattern: primary must succeed, secondary logs + increments counter on failure. + +- [ ] **Step 2: Verify build** + +Run: `cd api && go build ./...` +Expected: exit code 0 + +- [ ] **Step 3: Commit** + +```bash +git add api/pkg/repositories/hedging_heartbeat_monitor_repository.go +git commit -m "feat(api): add hedging heartbeat monitor repository" +``` + +--- + +### Task 3: Add HedgingFailureCounter to DI container ✅ + +**Files:** + +- Modified: `api/pkg/di/container.go` (added `HedgingFailureCounter()` method after `TursoDB()`, ~line 320) + +- [ ] **Step 1: Add the HedgingFailureCounter method** + +```go +func (container *Container) HedgingFailureCounter() otelMetric.Int64Counter { + meter := otel.GetMeterProvider().Meter( + container.projectID, + otelMetric.WithInstrumentationVersion(otel.Version()), + ) + counter, err := meter.Int64Counter( + "hedging.secondary.write.failures", + otelMetric.WithUnit("1"), + otelMetric.WithDescription("Number of failed secondary writes in hedging repositories"), + ) + if err != nil { + container.logger.Fatal(stacktrace.Propagate(err, "cannot create hedging failure counter")) + } + return counter +} +``` + +- [ ] **Step 2: Verify build** + +Run: `cd api && go build ./...` +Expected: exit code 0 + +--- + +### Task 4: Wire hedging mode in DI container ✅ + +**Files:** + +- Modified: `api/pkg/di/container.go` + + - `HeartbeatRepository()` (~line 1768) + - `HeartbeatMonitorRepository()` (~line 930) + +- [ ] **Step 1: Change both methods from if/else to switch** + +Replace the existing `if os.Getenv("HEARTBEAT_DB_BACKEND") == "turso"` with a switch: + +```go +func (container *Container) HeartbeatRepository() repositories.HeartbeatRepository { + switch os.Getenv("HEARTBEAT_DB_BACKEND") { + case "turso": + // existing libSQL path + case "hedging": + return repositories.NewHedgingHeartbeatRepository( + container.Logger(), + container.Tracer(), + repositories.NewGormHeartbeatRepository(container.Logger(), container.Tracer(), container.DedicatedDB()), + repositories.NewLibsqlHeartbeatRepository(container.Logger(), container.Tracer(), container.TursoDB()), + container.HedgingFailureCounter(), + ) + default: + // existing GORM path + } +} +``` + +Same pattern for `HeartbeatMonitorRepository()`. + +- [ ] **Step 2: Verify build** + +Run: `cd api && go build ./...` +Expected: exit code 0 + +- [ ] **Step 3: Run pre-commit hooks** + +Run: `cd api && gofumpt -w pkg/repositories/hedging_heartbeat_repository.go pkg/repositories/hedging_heartbeat_monitor_repository.go pkg/di/container.go` +Expected: exit code 0 + +- [ ] **Step 4: Final commit** + +```bash +git add api/pkg/di/container.go +git commit -m "feat(api): wire hedging mode in DI container (HEARTBEAT_DB_BACKEND=hedging)" +``` + +--- + +### Task 5: Final verification + +- [ ] **Step 1: Full build** + +Run: `cd api && go build ./...` +Expected: exit code 0 + +- [ ] **Step 2: Run tests** + +Run: `cd api && go test ./...` +Expected: all tests pass + +- [ ] **Step 3: go vet (no new warnings)** + +Run: `cd api && go vet ./... 2>&1 | Select-String "hedging"` +Expected: only pre-existing `non-constant format string` warnings (same category as all other repos) + +- [ ] **Step 4: Pre-commit hooks pass** + +Run: `git add -A && git commit --dry-run` +Expected: all hooks pass (go-fumpt, go-lint, go-imports, go-mod-tidy) + +- [ ] **Step 5: Verify three modes work (code review)** + +Check that the DI container correctly handles all three values: + +- Default (unset) → GORM only +- `turso` → libSQL only +- `hedging` → GORM primary + libSQL secondary diff --git a/docs/superpowers/specs/2026-05-15-hedging-repository-design.md b/docs/superpowers/specs/2026-05-15-hedging-repository-design.md new file mode 100644 index 00000000..2afc6ace --- /dev/null +++ b/docs/superpowers/specs/2026-05-15-hedging-repository-design.md @@ -0,0 +1,164 @@ +# Hedging Repository for Heartbeat & Monitor + +**Date:** 2026-05-15 +**Status:** Approved + +## Overview + +Create composite "hedging" repositories for `HeartbeatRepository` and `HeartbeatMonitorRepository` that write to both GORM (primary) and Turso (secondary). Reads only hit the primary. Secondary writes are fail-open — errors are logged and a metric is emitted, but the operation succeeds from the caller's perspective. + +## Motivation + +Gradually migrate heartbeat data to Turso by dual-writing. The GORM/PostgreSQL backend remains the source of truth while Turso builds up a complete dataset. If Turso has issues, the system is unaffected. + +## Configuration + +Activated via `HEARTBEAT_DB_BACKEND=hedging`. The three modes are now: + +| Value | Behavior | +| ----------------- | ---------------------------------------------------------------------- | +| _(unset/default)_ | GORM/PostgreSQL only | +| `turso` | Turso/libSQL only | +| `hedging` | GORM primary (reads+writes) + Turso secondary (writes only, fail-open) | + +## Architecture + +### New Files + +| File | Purpose | +| -------------------------------------------------------------- | -------------------------------------- | +| `api/pkg/repositories/hedging_heartbeat_repository.go` | Composite `HeartbeatRepository` | +| `api/pkg/repositories/hedging_heartbeat_monitor_repository.go` | Composite `HeartbeatMonitorRepository` | + +### Modified Files + +| File | Change | +| ------------------------- | ------------------------------------------------------------------ | +| `api/pkg/di/container.go` | Add `hedging` case to switch, add `HedgingFailureCounter()` method | + +## Method Delegation + +### HeartbeatRepository + +| Method | Primary (GORM) | Secondary (Turso) | +| ------------------ | -------------- | -------------------- | +| `Store` | ✅ write | ✅ write (fail-open) | +| `Index` | ✅ read | ❌ skip | +| `Last` | ✅ read | ❌ skip | +| `DeleteAllForUser` | ✅ write | ✅ write (fail-open) | + +### HeartbeatMonitorRepository + +| Method | Primary (GORM) | Secondary (Turso) | +| ------------------- | -------------- | -------------------- | +| `Store` | ✅ write | ✅ write (fail-open) | +| `Load` | ✅ read | ❌ skip | +| `Exists` | ✅ read | ❌ skip | +| `UpdateQueueID` | ✅ write | ✅ write (fail-open) | +| `Delete` | ✅ write | ✅ write (fail-open) | +| `UpdatePhoneOnline` | ✅ write | ✅ write (fail-open) | +| `DeleteAllForUser` | ✅ write | ✅ write (fail-open) | + +## Struct Design + +```go +type hedgingHeartbeatRepository struct { + logger telemetry.Logger + tracer telemetry.Tracer + primary HeartbeatRepository + secondary HeartbeatRepository + failureCounter otelMetric.Int64Counter +} + +type hedgingHeartbeatMonitorRepository struct { + logger telemetry.Logger + tracer telemetry.Tracer + primary HeartbeatMonitorRepository + secondary HeartbeatMonitorRepository + failureCounter otelMetric.Int64Counter +} +``` + +## Error Handling (Fail-Open Pattern) + +```go +func (r *hedgingHeartbeatRepository) Store(ctx context.Context, heartbeat *entities.Heartbeat) error { + ctx, span := r.tracer.Start(ctx) + defer span.End() + + // Primary: must succeed + if err := r.primary.Store(ctx, heartbeat); err != nil { + return err + } + + // Secondary: fail-open (log + metric) + if err := r.secondary.Store(ctx, heartbeat); err != nil { + r.logger.Error(stacktrace.Propagate(err, fmt.Sprintf( + "hedging: secondary write failed for heartbeat [%s]", heartbeat.ID, + ))) + r.failureCounter.Add(ctx, 1) + } + + return nil +} +``` + +**Rules:** + +- Primary error → return immediately, no secondary attempt +- Secondary error → log at ERROR level, increment `failureCounter`, return nil +- Read methods → delegate directly to primary, no secondary involvement +- Each method has its own tracing span via `tracer.Start(ctx)` + +## Observability + +**Metric:** + +- Name: `hedging.secondary.write.failures` +- Unit: `1` (count) +- Description: `Number of failed secondary writes in hedging repositories` +- Created once in DI container, shared by both hedging repos + +**Logging:** Each secondary failure logs at ERROR with the method context (entity ID, user ID, etc.) + +## DI Container Changes + +```go +func (container *Container) HeartbeatRepository() repositories.HeartbeatRepository { + switch os.Getenv("HEARTBEAT_DB_BACKEND") { + case "turso": + return repositories.NewLibsqlHeartbeatRepository(...) + case "hedging": + return repositories.NewHedgingHeartbeatRepository( + container.Logger(), + container.Tracer(), + repositories.NewGormHeartbeatRepository(container.Logger(), container.Tracer(), container.DedicatedDB()), + repositories.NewLibsqlHeartbeatRepository(container.Logger(), container.Tracer(), container.TursoDB()), + container.HedgingFailureCounter(), + ) + default: + return repositories.NewGormHeartbeatRepository(...) + } +} + +func (container *Container) HedgingFailureCounter() otelMetric.Int64Counter { + meter := otel.GetMeterProvider().Meter(container.projectID) + counter, err := meter.Int64Counter("hedging.secondary.write.failures", + otelMetric.WithUnit("1"), + otelMetric.WithDescription("Number of failed secondary writes in hedging repositories"), + ) + if err != nil { + container.logger.Fatal(...) + } + return counter +} +``` + +Same switch pattern for `HeartbeatMonitorRepository()`. + +## Safety + +- Default behavior (no env var) is unchanged — pure GORM/PostgreSQL +- `turso` mode remains available for pure Turso usage +- `hedging` mode never fails due to Turso issues — secondary is fully fail-open +- Service layer requires no changes — same interfaces diff --git a/docs/superpowers/specs/2026-05-15-turso-heartbeat-backend-design.md b/docs/superpowers/specs/2026-05-15-turso-heartbeat-backend-design.md new file mode 100644 index 00000000..b501650f --- /dev/null +++ b/docs/superpowers/specs/2026-05-15-turso-heartbeat-backend-design.md @@ -0,0 +1,162 @@ +# Turso/libSQL Backend for Heartbeat & Monitor Repositories + +**Date:** 2026-05-15 +**Status:** Approved + +## Overview + +Add a libSQL/Turso alternative implementation for `HeartbeatRepository` and `HeartbeatMonitorRepository`, switchable via the `HEARTBEAT_DB_BACKEND` environment variable. When set to `turso`, the API connects to a cloud-hosted Turso database instead of the dedicated PostgreSQL instance. + +## Motivation + +Move heartbeat storage to a dedicated Turso database for cost efficiency and edge performance, while keeping the existing PostgreSQL path as the default fallback. + +## Configuration + +| Env Var | Purpose | Example | +| ---------------------- | -------------------------------------------------------------- | ---------------------------------------------------------------------- | +| `HEARTBEAT_DB_BACKEND` | Selects backend (`turso` = libSQL, anything else = PostgreSQL) | `turso` | +| `TURSO_DATABASE_DSN` | Turso database DSN (URL with authToken query param) | `libsql://httpsms-ndolestudio.aws-us-east-1.turso.io?authToken=eyJ...` | + +When `HEARTBEAT_DB_BACKEND` is not set or is any value other than `turso`, the existing GORM/PostgreSQL path is used unchanged. + +## Architecture + +### Approach + +Direct `database/sql` with the Turso Go remote driver (`github.com/tursodatabase/libsql-client-go`). No ORM — raw SQL queries for a simple 2-table schema. This is the pure-Go HTTP driver for remote Turso Cloud access (no CGo required). + +### New Files + +| File | Purpose | +| ------------------------------------------------------------- | -------------------------------------------------------- | +| `api/pkg/repositories/libsql.go` | Connection factory, table auto-creation, shared helpers | +| `api/pkg/repositories/libsql_heartbeat_repository.go` | `HeartbeatRepository` implementation using libSQL | +| `api/pkg/repositories/libsql_heartbeat_monitor_repository.go` | `HeartbeatMonitorRepository` implementation using libSQL | + +### Modified Files + +| File | Change | +| ------------------------- | --------------------------------------------------------------------------------------------------------------------------------- | +| `api/pkg/di/container.go` | Add `tursoDB *sql.DB` field, `TursoDB()` method, conditional wiring in `HeartbeatRepository()` and `HeartbeatMonitorRepository()` | +| `api/go.mod` | Add `github.com/tursodatabase/libsql-client-go` dependency | + +## Database Schema + +```sql +CREATE TABLE IF NOT EXISTS heartbeats ( + id TEXT PRIMARY KEY, + owner TEXT NOT NULL, + version TEXT NOT NULL, + charging INTEGER NOT NULL DEFAULT 0, + user_id TEXT NOT NULL, + timestamp DATETIME NOT NULL +); +CREATE INDEX IF NOT EXISTS idx_heartbeats_owner_timestamp ON heartbeats(owner, timestamp); +CREATE INDEX IF NOT EXISTS idx_heartbeats_user_id ON heartbeats(user_id); + +CREATE TABLE IF NOT EXISTS heartbeat_monitors ( + id TEXT PRIMARY KEY, + phone_id TEXT NOT NULL, + user_id TEXT NOT NULL, + queue_id TEXT NOT NULL DEFAULT '', + owner TEXT NOT NULL, + phone_online INTEGER NOT NULL DEFAULT 1, + created_at DATETIME NOT NULL, + updated_at DATETIME NOT NULL +); +CREATE INDEX IF NOT EXISTS idx_heartbeat_monitors_user_owner ON heartbeat_monitors(user_id, owner); +``` + +**Type mappings from PostgreSQL:** + +- UUID → TEXT +- BOOLEAN → INTEGER (0/1) +- TIMESTAMP → DATETIME (ISO 8601 text) + +## Repository Implementations + +### `libsql.go` (shared) + +- `NewTursoDB(url, authToken string) (*sql.DB, error)` — opens connection with libSQL driver, executes CREATE TABLE/INDEX statements +- Returns `*sql.DB` for use by both repository implementations + +### `libsql_heartbeat_repository.go` + +Implements `HeartbeatRepository`: + +| Method | SQL | +| ------------------ | ------------------------------------------------------------------------------------------------------------------------ | +| `Store` | `INSERT INTO heartbeats (id, owner, version, charging, user_id, timestamp) VALUES (?, ?, ?, ?, ?, ?)` | +| `Index` | `SELECT ... WHERE user_id = ? AND owner = ? ORDER BY timestamp DESC LIMIT ? OFFSET ?` (optional `version LIKE ?` filter) | +| `Last` | `SELECT ... WHERE user_id = ? AND owner = ? ORDER BY timestamp DESC LIMIT 1` | +| `DeleteAllForUser` | `DELETE FROM heartbeats WHERE user_id = ?` | + +### `libsql_heartbeat_monitor_repository.go` + +Implements `HeartbeatMonitorRepository`: + +| Method | SQL | +| ------------------- | --------------------------------------------------------------------------- | +| `Store` | INSERT all fields | +| `Load` | SELECT by user_id + owner | +| `Exists` | `SELECT COUNT(*) FROM ... WHERE user_id = ? AND id = ?` (returns count > 0) | +| `UpdateQueueID` | UPDATE queue_id + updated_at WHERE id = ? | +| `Delete` | DELETE WHERE user_id = ? AND owner = ? | +| `UpdatePhoneOnline` | UPDATE phone_online + updated_at WHERE id = ? AND user_id = ? | +| `DeleteAllForUser` | DELETE WHERE user_id = ? | + +### Error Handling + +- `sql.ErrNoRows` → wrap with `stacktrace.PropagateWithCode(err, ErrCodeNotFound, msg)` to match GORM behavior expected by the service layer +- All other errors → wrap with `stacktrace.Propagate(err, msg)` + +### Observability + +Every method follows the existing tracing pattern: + +```go +ctx, span := repository.tracer.Start(ctx) +defer span.End() +``` + +## DI Container Changes + +```go +// New field on Container struct +tursoDB *sql.DB + +// New method +func (container *Container) TursoDB() *sql.DB { + if container.tursoDB != nil { + return container.tursoDB + } + db, err := repositories.NewTursoDB(os.Getenv("TURSO_DATABASE_DSN")) + if err != nil { + container.logger.Fatal(err) + } + container.tursoDB = db + return container.tursoDB +} + +// Modified methods +func (container *Container) HeartbeatRepository() repositories.HeartbeatRepository { + if os.Getenv("HEARTBEAT_DB_BACKEND") == "turso" { + return repositories.NewLibsqlHeartbeatRepository(container.Logger(), container.Tracer(), container.TursoDB()) + } + return repositories.NewGormHeartbeatRepository(container.Logger(), container.Tracer(), container.DedicatedDB()) +} + +func (container *Container) HeartbeatMonitorRepository() repositories.HeartbeatMonitorRepository { + if os.Getenv("HEARTBEAT_DB_BACKEND") == "turso" { + return repositories.NewLibsqlHeartbeatMonitorRepository(container.Logger(), container.Tracer(), container.TursoDB()) + } + return repositories.NewGormHeartbeatMonitorRepository(container.Logger(), container.Tracer(), container.DedicatedDB()) +} +``` + +## Safety + +- When `HEARTBEAT_DB_BACKEND != "turso"`, `TursoDB()` is never called — no Turso connection is opened +- The existing PostgreSQL path remains the default and is completely unaffected +- Both implementations satisfy the same interface — the service layer requires no changes diff --git a/tests/.env.test b/tests/.env.test index a95703f0..909902ae 100644 --- a/tests/.env.test +++ b/tests/.env.test @@ -28,3 +28,5 @@ PUSHER_CLUSTER= GCS_BUCKET_NAME= UPTRACE_DSN= CLOUDFLARE_TURNSTILE_SECRET_KEY= +HEARTBEAT_DB_BACKEND=hedging +TURSO_DATABASE_DSN=http://sqld:8080 diff --git a/tests/docker-compose.yml b/tests/docker-compose.yml index a4d6b9dc..1e111b78 100644 --- a/tests/docker-compose.yml +++ b/tests/docker-compose.yml @@ -25,6 +25,11 @@ services: timeout: 5s retries: 10 + sqld: + image: ghcr.io/tursodatabase/libsql-server:latest + ports: + - "8090:8080" + wiremock: image: wiremock/wiremock:3x ports: @@ -53,6 +58,8 @@ services: condition: service_healthy wiremock: condition: service_healthy + sqld: + condition: service_started env_file: - .env.test environment: diff --git a/tests/integration_test.go b/tests/integration_test.go index 5aae58ff..12776aac 100644 --- a/tests/integration_test.go +++ b/tests/integration_test.go @@ -346,3 +346,59 @@ func TestSendSMS_OutstandingFlow(t *testing.T) { assertWebhookJWT(t, req.Request, signingKey) } } + +func TestHeartbeat_StoreAndIndex(t *testing.T) { + ctx := context.Background() + phone := setupPhone(ctx, t, 60) + + // Store a heartbeat via phone API key (retry to allow async phone-API-key association) + storePayload := map[string]interface{}{ + "phone_numbers": []string{phone.PhoneNumber}, + "charging": true, + } + + url := apiBaseURL + "/v1/heartbeats" + var respBody []byte + var statusCode int + deadline := time.Now().Add(15 * time.Second) + for time.Now().Before(deadline) { + body, err := json.Marshal(storePayload) + require.NoError(t, err) + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) + require.NoError(t, err) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("x-api-key", phone.PhoneAPIKey) + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + + respBody, err = io.ReadAll(resp.Body) + resp.Body.Close() + require.NoError(t, err) + + statusCode = resp.StatusCode + if statusCode == http.StatusCreated { + break + } + time.Sleep(500 * time.Millisecond) + } + require.Equal(t, http.StatusCreated, statusCode, "store heartbeat failed: %s", string(respBody)) + + // Read heartbeats back via user API key + client := newAPIClient() + heartbeats, indexResp, err := client.Heartbeats.Index(ctx, &httpsms.HeartbeatIndexParams{ + Owner: phone.PhoneNumber, + Limit: 1, + }) + require.NoError(t, err) + require.Equal(t, http.StatusOK, indexResp.HTTPResponse.StatusCode) + + require.NotNil(t, heartbeats) + require.GreaterOrEqual(t, len(heartbeats.Data), 1, "expected at least 1 heartbeat") + + hb := heartbeats.Data[0] + assert.Equal(t, phone.PhoneNumber, hb.Owner) + assert.True(t, hb.Charging) + assert.False(t, hb.Timestamp.IsZero(), "timestamp should not be zero") +}