From d826c991181c32cee91b525b859b5c9f29eaddf0 Mon Sep 17 00:00:00 2001 From: Acho Arnold Date: Fri, 15 May 2026 22:58:12 +0300 Subject: [PATCH 01/14] docs: add design spec for Turso/libSQL heartbeat backend Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- ...26-05-15-turso-heartbeat-backend-design.md | 163 ++++++++++++++++++ 1 file changed, 163 insertions(+) create mode 100644 docs/superpowers/specs/2026-05-15-turso-heartbeat-backend-design.md diff --git a/docs/superpowers/specs/2026-05-15-turso-heartbeat-backend-design.md b/docs/superpowers/specs/2026-05-15-turso-heartbeat-backend-design.md new file mode 100644 index 00000000..4008c80d --- /dev/null +++ b/docs/superpowers/specs/2026-05-15-turso-heartbeat-backend-design.md @@ -0,0 +1,163 @@ +# Turso/libSQL Backend for Heartbeat & Monitor Repositories + +**Date:** 2026-05-15 +**Status:** Approved + +## Overview + +Add a libSQL/Turso alternative implementation for `HeartbeatRepository` and `HeartbeatMonitorRepository`, switchable via the `HEARTBEAT_DB_BACKEND` environment variable. When set to `turso`, the API connects to a cloud-hosted Turso database instead of the dedicated PostgreSQL instance. + +## Motivation + +Move heartbeat storage to a dedicated Turso database for cost efficiency and edge performance, while keeping the existing PostgreSQL path as the default fallback. + +## Configuration + +| Env Var | Purpose | Example | +| ---------------------- | -------------------------------------------------------------- | ----------------------------------------------------- | +| `HEARTBEAT_DB_BACKEND` | Selects backend (`turso` = libSQL, anything else = PostgreSQL) | `turso` | +| `TURSO_DATABASE_URL` | Turso database URL | `libsql://httpsms-ndolestudio.aws-us-east-1.turso.io` | +| `TURSO_AUTH_TOKEN` | Turso auth token | `eyJ...` | + +When `HEARTBEAT_DB_BACKEND` is not set or is any value other than `turso`, the existing GORM/PostgreSQL path is used unchanged. + +## Architecture + +### Approach + +Direct `database/sql` with the official Turso Go driver (`github.com/tursodatabase/go-libsql`). No ORM — raw SQL queries for a simple 2-table schema. + +### New Files + +| File | Purpose | +| ------------------------------------------------------------- | -------------------------------------------------------- | +| `api/pkg/repositories/libsql.go` | Connection factory, table auto-creation, shared helpers | +| `api/pkg/repositories/libsql_heartbeat_repository.go` | `HeartbeatRepository` implementation using libSQL | +| `api/pkg/repositories/libsql_heartbeat_monitor_repository.go` | `HeartbeatMonitorRepository` implementation using libSQL | + +### Modified Files + +| File | Change | +| ------------------------- | --------------------------------------------------------------------------------------------------------------------------------- | +| `api/pkg/di/container.go` | Add `tursoDB *sql.DB` field, `TursoDB()` method, conditional wiring in `HeartbeatRepository()` and `HeartbeatMonitorRepository()` | +| `api/go.mod` | Add `github.com/tursodatabase/go-libsql` dependency | + +## Database Schema + +```sql +CREATE TABLE IF NOT EXISTS heartbeats ( + id TEXT PRIMARY KEY, + owner TEXT NOT NULL, + version TEXT NOT NULL, + charging INTEGER NOT NULL DEFAULT 0, + user_id TEXT NOT NULL, + timestamp DATETIME NOT NULL +); +CREATE INDEX IF NOT EXISTS idx_heartbeats_owner_timestamp ON heartbeats(owner, timestamp); +CREATE INDEX IF NOT EXISTS idx_heartbeats_user_id ON heartbeats(user_id); + +CREATE TABLE IF NOT EXISTS heartbeat_monitors ( + id TEXT PRIMARY KEY, + phone_id TEXT NOT NULL, + user_id TEXT NOT NULL, + queue_id TEXT NOT NULL DEFAULT '', + owner TEXT NOT NULL, + phone_online INTEGER NOT NULL DEFAULT 1, + created_at DATETIME NOT NULL, + updated_at DATETIME NOT NULL +); +CREATE INDEX IF NOT EXISTS idx_heartbeat_monitors_user_owner ON heartbeat_monitors(user_id, owner); +``` + +**Type mappings from PostgreSQL:** + +- UUID → TEXT +- BOOLEAN → INTEGER (0/1) +- TIMESTAMP → DATETIME (ISO 8601 text) + +## Repository Implementations + +### `libsql.go` (shared) + +- `NewTursoDB(url, authToken string) (*sql.DB, error)` — opens connection with libSQL driver, executes CREATE TABLE/INDEX statements +- Returns `*sql.DB` for use by both repository implementations + +### `libsql_heartbeat_repository.go` + +Implements `HeartbeatRepository`: + +| Method | SQL | +| ------------------ | ------------------------------------------------------------------------------------------------------------------------ | +| `Store` | `INSERT INTO heartbeats (id, owner, version, charging, user_id, timestamp) VALUES (?, ?, ?, ?, ?, ?)` | +| `Index` | `SELECT ... WHERE user_id = ? AND owner = ? ORDER BY timestamp DESC LIMIT ? OFFSET ?` (optional `version LIKE ?` filter) | +| `Last` | `SELECT ... WHERE user_id = ? AND owner = ? ORDER BY timestamp DESC LIMIT 1` | +| `DeleteAllForUser` | `DELETE FROM heartbeats WHERE user_id = ?` | + +### `libsql_heartbeat_monitor_repository.go` + +Implements `HeartbeatMonitorRepository`: + +| Method | SQL | +| ------------------- | --------------------------------------------------------------------------- | +| `Store` | INSERT all fields | +| `Load` | SELECT by user_id + owner | +| `Exists` | `SELECT COUNT(*) FROM ... WHERE user_id = ? AND id = ?` (returns count > 0) | +| `UpdateQueueID` | UPDATE queue_id + updated_at WHERE id = ? | +| `Delete` | DELETE WHERE user_id = ? AND owner = ? | +| `UpdatePhoneOnline` | UPDATE phone_online + updated_at WHERE id = ? AND user_id = ? | +| `DeleteAllForUser` | DELETE WHERE user_id = ? | + +### Error Handling + +- `sql.ErrNoRows` → wrap with `stacktrace.PropagateWithCode(err, ErrCodeNotFound, msg)` to match GORM behavior expected by the service layer +- All other errors → wrap with `stacktrace.Propagate(err, msg)` + +### Observability + +Every method follows the existing tracing pattern: + +```go +ctx, span := repository.tracer.Start(ctx) +defer span.End() +``` + +## DI Container Changes + +```go +// New field on Container struct +tursoDB *sql.DB + +// New method +func (container *Container) TursoDB() *sql.DB { + if container.tursoDB != nil { + return container.tursoDB + } + db, err := repositories.NewTursoDB(os.Getenv("TURSO_DATABASE_URL"), os.Getenv("TURSO_AUTH_TOKEN")) + if err != nil { + container.logger.Fatal(err) + } + container.tursoDB = db + return container.tursoDB +} + +// Modified methods +func (container *Container) HeartbeatRepository() repositories.HeartbeatRepository { + if os.Getenv("HEARTBEAT_DB_BACKEND") == "turso" { + return repositories.NewLibsqlHeartbeatRepository(container.Logger(), container.Tracer(), container.TursoDB()) + } + return repositories.NewGormHeartbeatRepository(container.Logger(), container.Tracer(), container.DedicatedDB()) +} + +func (container *Container) HeartbeatMonitorRepository() repositories.HeartbeatMonitorRepository { + if os.Getenv("HEARTBEAT_DB_BACKEND") == "turso" { + return repositories.NewLibsqlHeartbeatMonitorRepository(container.Logger(), container.Tracer(), container.TursoDB()) + } + return repositories.NewGormHeartbeatMonitorRepository(container.Logger(), container.Tracer(), container.DedicatedDB()) +} +``` + +## Safety + +- When `HEARTBEAT_DB_BACKEND != "turso"`, `TursoDB()` is never called — no Turso connection is opened +- The existing PostgreSQL path remains the default and is completely unaffected +- Both implementations satisfy the same interface — the service layer requires no changes From 017c6d6dfc6fb25f9ab9e6690c8a6d966f7cb7bd Mon Sep 17 00:00:00 2001 From: Acho Arnold Date: Fri, 15 May 2026 23:10:14 +0300 Subject: [PATCH 02/14] feat(api): add Turso/libSQL backend for heartbeat repositories Add alternative HeartbeatRepository and HeartbeatMonitorRepository implementations using libSQL (Turso) via database/sql. Switchable via HEARTBEAT_DB_BACKEND=turso env var. Requires TURSO_DATABASE_URL and TURSO_AUTH_TOKEN when enabled. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- api/go.mod | 4 + api/go.sum | 8 + api/pkg/di/container.go | 38 ++++ api/pkg/repositories/libsql.go | 68 +++++++ .../libsql_heartbeat_monitor_repository.go | 190 ++++++++++++++++++ .../libsql_heartbeat_repository.go | 173 ++++++++++++++++ 6 files changed, 481 insertions(+) create mode 100644 api/pkg/repositories/libsql.go create mode 100644 api/pkg/repositories/libsql_heartbeat_monitor_repository.go create mode 100644 api/pkg/repositories/libsql_heartbeat_repository.go diff --git a/api/go.mod b/api/go.mod index 0c6ccd0b..d77c0bf6 100644 --- a/api/go.mod +++ b/api/go.mod @@ -44,6 +44,7 @@ require ( github.com/stretchr/testify v1.11.1 github.com/swaggo/swag v1.16.6 github.com/thedevsaddam/govalidator v1.9.10 + github.com/tursodatabase/libsql-client-go v0.0.0-20260514053736-a9a8fadfe885 github.com/uptrace/uptrace-go v1.43.0 github.com/xuri/excelize/v2 v2.10.1 go.opentelemetry.io/otel v1.43.0 @@ -93,11 +94,13 @@ require ( github.com/PuerkitoBio/goquery v1.12.0 // indirect github.com/andybalholm/brotli v1.2.1 // indirect github.com/andybalholm/cascadia v1.3.3 // indirect + github.com/antlr4-go/antlr/v4 v4.13.0 // indirect github.com/cenkalti/backoff/v5 v5.0.3 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/clipperhouse/displaywidth v0.11.0 // indirect github.com/clipperhouse/uax29/v2 v2.7.0 // indirect github.com/cncf/xds/go v0.0.0-20260202195803-dba9d589def2 // indirect + github.com/coder/websocket v1.8.12 // indirect github.com/envoyproxy/go-control-plane/envoy v1.37.0 // indirect github.com/envoyproxy/protoc-gen-validate v1.3.3 // indirect github.com/fatih/color v1.19.0 // indirect @@ -183,6 +186,7 @@ require ( go.uber.org/zap v1.28.0 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect golang.org/x/crypto v0.50.0 // indirect + golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 // indirect golang.org/x/mod v0.35.0 // indirect golang.org/x/net v0.53.0 // indirect golang.org/x/oauth2 v0.36.0 // indirect diff --git a/api/go.sum b/api/go.sum index 5af5496f..0710b094 100644 --- a/api/go.sum +++ b/api/go.sum @@ -66,6 +66,8 @@ github.com/andybalholm/brotli v1.2.1 h1:R+f5xP285VArJDRgowrfb9DqL18yVK0gKAW/F+eT github.com/andybalholm/brotli v1.2.1/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY= github.com/andybalholm/cascadia v1.3.3 h1:AG2YHrzJIm4BZ19iwJ/DAua6Btl3IwJX+VI4kktS1LM= github.com/andybalholm/cascadia v1.3.3/go.mod h1:xNd9bqTn98Ln4DwST8/nG+H0yuB8Hmgu1YHNnWw0GeA= +github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8TVTI= +github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g= github.com/avast/retry-go/v5 v5.0.0 h1:kf1Qc2UsTZ4qq8elDymqfbISvkyMuhgRxuJqX2NHP7k= github.com/avast/retry-go/v5 v5.0.0/go.mod h1://d+usmKWio1agtZfS1H/ltTqwtIfBnRq9zEwjc3eH8= github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= @@ -88,6 +90,8 @@ github.com/cncf/xds/go v0.0.0-20260202195803-dba9d589def2 h1:aBangftG7EVZoUb69Os github.com/cncf/xds/go v0.0.0-20260202195803-dba9d589def2/go.mod h1:qwXFYgsP6T7XnJtbKlf1HP8AjxZZyzxMmc+Lq5GjlU4= github.com/cockroachdb/cockroach-go/v2 v2.4.3 h1:LJO3K3jC5WXvMePRQSJE1NsIGoFGcEx1LW83W6RAlhw= github.com/cockroachdb/cockroach-go/v2 v2.4.3/go.mod h1:9U179XbCx4qFWtNhc7BiWLPfuyMVQ7qdAhfrwLz1vH0= +github.com/coder/websocket v1.8.12 h1:5bUXkEPPIbewrnkU8LTCLVaxi4N4J8ahufH2vlo4NAo= +github.com/coder/websocket v1.8.12/go.mod h1:LNVeNrXQZfe5qhS9ALED3uA+l5pPqvwXg3CKoDBB2gs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= @@ -320,6 +324,8 @@ github.com/thedevsaddam/govalidator v1.9.10 h1:m3dLRbSZ5Hts3VUWYe+vxLMG+FdyQuWOj github.com/thedevsaddam/govalidator v1.9.10/go.mod h1:Ilx8u7cg5g3LXbSS943cx5kczyNuUn7LH/cK5MYuE90= github.com/tiendc/go-deepcopy v1.7.2 h1:Ut2yYR7W9tWjTQitganoIue4UGxZwCcJy3orjrrIj44= github.com/tiendc/go-deepcopy v1.7.2/go.mod h1:4bKjNC2r7boYOkD2IOuZpYjmlDdzjbpTRyCx+goBCJQ= +github.com/tursodatabase/libsql-client-go v0.0.0-20260514053736-a9a8fadfe885 h1:YssVXwM/9nUAjGNmUWdgvb05JVcsaBrDn5yr+MaJTn0= +github.com/tursodatabase/libsql-client-go v0.0.0-20260514053736-a9a8fadfe885/go.mod h1:08inkKyguB6CGGssc/JzhmQWwBgFQBgjlYFjxjRh7nU= github.com/uptrace/uptrace-go v1.43.0 h1:5QuCdyFJdWUEXx6Fr6sYfezdgO6n6lnkOvUTLlyQO7U= github.com/uptrace/uptrace-go v1.43.0/go.mod h1:ehDTIdtBSolg4Z0CCvg1C8yR6VX1YFDqBcg2KmsXWn0= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= @@ -410,6 +416,8 @@ golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/crypto v0.50.0 h1:zO47/JPrL6vsNkINmLoo/PH1gcxpls50DNogFvB5ZGI= golang.org/x/crypto v0.50.0/go.mod h1:3muZ7vA7PBCE6xgPX7nkzzjiUq87kRItoJQM1Yo8S+Q= +golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 h1:aAcj0Da7eBAtrTp03QXWvm88pSyOt+UgdZw2BFZ+lEw= +golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8/go.mod h1:CQ1k9gNrJ50XIzaKCRR2hssIjF07kZFEiieALBM/ARQ= golang.org/x/image v0.25.0 h1:Y6uW6rH1y5y/LK1J8BPWZtr6yZ7hrsy6hFrXjgsc2fQ= golang.org/x/image v0.25.0/go.mod h1:tCAmOEGthTtkalusGp1g3xa2gke8J6c2N565dTyl9Rs= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= diff --git a/api/pkg/di/container.go b/api/pkg/di/container.go index 2ecfa04d..24f3a174 100644 --- a/api/pkg/di/container.go +++ b/api/pkg/di/container.go @@ -3,6 +3,7 @@ package di import ( "context" "crypto/tls" + "database/sql" "fmt" "net/http" "os" @@ -82,6 +83,7 @@ type Container struct { projectID string db *gorm.DB dedicatedDB *gorm.DB + tursoDB *sql.DB version string app *fiber.App eventDispatcher *services.EventDispatcher @@ -295,6 +297,26 @@ func (container *Container) DedicatedDB() (db *gorm.DB) { return container.dedicatedDB } +// TursoDB creates a *sql.DB connection to a Turso/libSQL database +func (container *Container) TursoDB() *sql.DB { + if container.tursoDB != nil { + return container.tursoDB + } + + container.logger.Debug("creating Turso *sql.DB connection") + + db, err := repositories.NewTursoDB( + os.Getenv("TURSO_DATABASE_URL"), + os.Getenv("TURSO_AUTH_TOKEN"), + ) + if err != nil { + container.logger.Fatal(err) + } + + container.tursoDB = db + return container.tursoDB +} + // DBWithoutMigration creates an instance of gorm.DB if it has not been created already func (container *Container) DBWithoutMigration() (db *gorm.DB) { if container.db != nil { @@ -889,6 +911,14 @@ func (container *Container) MessageThreadRepository() (repository repositories.M // HeartbeatMonitorRepository creates a new instance of repositories.HeartbeatMonitorRepository func (container *Container) HeartbeatMonitorRepository() (repository repositories.HeartbeatMonitorRepository) { + if os.Getenv("HEARTBEAT_DB_BACKEND") == "turso" { + container.logger.Debug("creating libSQL repositories.HeartbeatMonitorRepository") + return repositories.NewLibsqlHeartbeatMonitorRepository( + container.Logger(), + container.Tracer(), + container.TursoDB(), + ) + } container.logger.Debug("creating GORM repositories.HeartbeatMonitorRepository") return repositories.NewGormHeartbeatMonitorRepository( container.Logger(), @@ -1708,6 +1738,14 @@ func (container *Container) RegisterSwaggerRoutes() { // HeartbeatRepository registers a new instance of repositories.HeartbeatRepository func (container *Container) HeartbeatRepository() repositories.HeartbeatRepository { + if os.Getenv("HEARTBEAT_DB_BACKEND") == "turso" { + container.logger.Debug("creating libSQL repositories.HeartbeatRepository") + return repositories.NewLibsqlHeartbeatRepository( + container.Logger(), + container.Tracer(), + container.TursoDB(), + ) + } container.logger.Debug("creating GORM repositories.HeartbeatRepository") return repositories.NewGormHeartbeatRepository( container.Logger(), diff --git a/api/pkg/repositories/libsql.go b/api/pkg/repositories/libsql.go new file mode 100644 index 00000000..388c475c --- /dev/null +++ b/api/pkg/repositories/libsql.go @@ -0,0 +1,68 @@ +package repositories + +import ( + "database/sql" + "fmt" + + _ "github.com/tursodatabase/libsql-client-go/libsql" // libSQL database driver + + "github.com/palantir/stacktrace" +) + +const ( + tableHeartbeats = "heartbeats" + tableHeartbeatMonitors = "heartbeat_monitors" +) + +// NewTursoDB creates a new *sql.DB connection to a Turso database and auto-creates tables +func NewTursoDB(url, authToken string) (*sql.DB, error) { + dsn := url + "?authToken=" + authToken + db, err := sql.Open("libsql", dsn) + if err != nil { + return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot open turso database at [%s]", url)) + } + + if err = db.Ping(); err != nil { + return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot ping turso database at [%s]", url)) + } + + if err = createTursoTables(db); err != nil { + return nil, stacktrace.Propagate(err, "cannot create turso tables") + } + + return db, nil +} + +func createTursoTables(db *sql.DB) error { + statements := []string{ + `CREATE TABLE IF NOT EXISTS ` + tableHeartbeats + ` ( + id TEXT PRIMARY KEY, + owner TEXT NOT NULL, + version TEXT NOT NULL, + charging INTEGER NOT NULL DEFAULT 0, + user_id TEXT NOT NULL, + timestamp DATETIME NOT NULL + )`, + `CREATE INDEX IF NOT EXISTS idx_heartbeats_owner_timestamp ON ` + tableHeartbeats + `(owner, timestamp)`, + `CREATE INDEX IF NOT EXISTS idx_heartbeats_user_id ON ` + tableHeartbeats + `(user_id)`, + `CREATE TABLE IF NOT EXISTS ` + tableHeartbeatMonitors + ` ( + id TEXT PRIMARY KEY, + phone_id TEXT NOT NULL, + user_id TEXT NOT NULL, + queue_id TEXT NOT NULL DEFAULT '', + owner TEXT NOT NULL, + phone_online INTEGER NOT NULL DEFAULT 1, + created_at DATETIME NOT NULL, + updated_at DATETIME NOT NULL + )`, + `CREATE INDEX IF NOT EXISTS idx_heartbeat_monitors_user_owner ON ` + tableHeartbeatMonitors + `(user_id, owner)`, + } + + for _, stmt := range statements { + if _, err := db.Exec(stmt); err != nil { + return stacktrace.Propagate(err, fmt.Sprintf("cannot execute statement: %s", stmt)) + } + } + + return nil +} diff --git a/api/pkg/repositories/libsql_heartbeat_monitor_repository.go b/api/pkg/repositories/libsql_heartbeat_monitor_repository.go new file mode 100644 index 00000000..9477f2d7 --- /dev/null +++ b/api/pkg/repositories/libsql_heartbeat_monitor_repository.go @@ -0,0 +1,190 @@ +package repositories + +import ( + "context" + "database/sql" + "fmt" + "time" + + "github.com/google/uuid" + + "github.com/NdoleStudio/httpsms/pkg/entities" + "github.com/NdoleStudio/httpsms/pkg/telemetry" + "github.com/palantir/stacktrace" +) + +// libsqlHeartbeatMonitorRepository is responsible for persisting entities.HeartbeatMonitor in Turso/libSQL +type libsqlHeartbeatMonitorRepository struct { + logger telemetry.Logger + tracer telemetry.Tracer + db *sql.DB +} + +// NewLibsqlHeartbeatMonitorRepository creates the libSQL version of the HeartbeatMonitorRepository +func NewLibsqlHeartbeatMonitorRepository( + logger telemetry.Logger, + tracer telemetry.Tracer, + db *sql.DB, +) HeartbeatMonitorRepository { + return &libsqlHeartbeatMonitorRepository{ + logger: logger.WithService(fmt.Sprintf("%T", &libsqlHeartbeatMonitorRepository{})), + tracer: tracer, + db: db, + } +} + +func (repository *libsqlHeartbeatMonitorRepository) Store(ctx context.Context, monitor *entities.HeartbeatMonitor) error { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) + defer cancel() + + _, err := repository.db.ExecContext(ctx, + "INSERT INTO "+tableHeartbeatMonitors+" (id, phone_id, user_id, queue_id, owner, phone_online, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + monitor.ID.String(), + monitor.PhoneID.String(), + string(monitor.UserID), + monitor.QueueID, + monitor.Owner, + boolToInt(monitor.PhoneOnline), + monitor.CreatedAt.UTC(), + monitor.UpdatedAt.UTC(), + ) + if err != nil { + msg := fmt.Sprintf("cannot save heartbeat monitor with ID [%s]", monitor.ID) + return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + + return nil +} + +func (repository *libsqlHeartbeatMonitorRepository) Load(ctx context.Context, userID entities.UserID, phoneNumber string) (*entities.HeartbeatMonitor, error) { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) + defer cancel() + + row := repository.db.QueryRowContext(ctx, + "SELECT id, phone_id, user_id, queue_id, owner, phone_online, created_at, updated_at FROM "+tableHeartbeatMonitors+" WHERE user_id = ? AND owner = ? LIMIT 1", + string(userID), phoneNumber, + ) + + monitor, err := scanHeartbeatMonitorRow(row) + if err == sql.ErrNoRows { + msg := fmt.Sprintf("heartbeat monitor with userID [%s] and owner [%s] does not exist", userID, phoneNumber) + return nil, repository.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, ErrCodeNotFound, msg)) + } + if err != nil { + msg := fmt.Sprintf("cannot load heartbeat monitor with userID [%s] and owner [%s]", userID, phoneNumber) + return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + + return monitor, nil +} + +func (repository *libsqlHeartbeatMonitorRepository) Exists(ctx context.Context, userID entities.UserID, monitorID uuid.UUID) (bool, error) { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) + defer cancel() + + var count int + err := repository.db.QueryRowContext(ctx, + "SELECT COUNT(*) FROM "+tableHeartbeatMonitors+" WHERE user_id = ? AND id = ?", + string(userID), monitorID.String(), + ).Scan(&count) + if err != nil { + msg := fmt.Sprintf("cannot check if heartbeat monitor exists with userID [%s] and monitor ID [%s]", userID, monitorID) + return false, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + + return count > 0, nil +} + +func (repository *libsqlHeartbeatMonitorRepository) UpdateQueueID(ctx context.Context, monitorID uuid.UUID, queueID string) error { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) + defer cancel() + + _, err := repository.db.ExecContext(ctx, + "UPDATE "+tableHeartbeatMonitors+" SET queue_id = ?, updated_at = ? WHERE id = ?", + queueID, time.Now().UTC(), monitorID.String(), + ) + if err != nil { + msg := fmt.Sprintf("cannot update heartbeat monitor ID [%s]", monitorID) + return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + + return nil +} + +func (repository *libsqlHeartbeatMonitorRepository) Delete(ctx context.Context, userID entities.UserID, phoneNumber string) error { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) + defer cancel() + + _, err := repository.db.ExecContext(ctx, + "DELETE FROM "+tableHeartbeatMonitors+" WHERE user_id = ? AND owner = ?", + string(userID), phoneNumber, + ) + if err != nil { + msg := fmt.Sprintf("cannot delete heartbeat monitor with owner [%s] and userID [%s]", phoneNumber, userID) + return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + + return nil +} + +func (repository *libsqlHeartbeatMonitorRepository) UpdatePhoneOnline(ctx context.Context, userID entities.UserID, monitorID uuid.UUID, online bool) error { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) + defer cancel() + + _, err := repository.db.ExecContext(ctx, + "UPDATE "+tableHeartbeatMonitors+" SET phone_online = ?, updated_at = ? WHERE id = ? AND user_id = ?", + boolToInt(online), time.Now().UTC(), monitorID.String(), string(userID), + ) + if err != nil { + msg := fmt.Sprintf("cannot update heartbeat monitor ID [%s] for user [%s]", monitorID, userID) + return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + + return nil +} + +func (repository *libsqlHeartbeatMonitorRepository) DeleteAllForUser(ctx context.Context, userID entities.UserID) error { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + _, err := repository.db.ExecContext(ctx, "DELETE FROM "+tableHeartbeatMonitors+" WHERE user_id = ?", string(userID)) + if err != nil { + msg := fmt.Sprintf("cannot delete all [%T] for user with ID [%s]", &entities.HeartbeatMonitor{}, userID) + return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + + return nil +} + +func scanHeartbeatMonitorRow(row *sql.Row) (*entities.HeartbeatMonitor, error) { + monitor := new(entities.HeartbeatMonitor) + var id, phoneID, userID string + var phoneOnline int + err := row.Scan(&id, &phoneID, &userID, &monitor.QueueID, &monitor.Owner, &phoneOnline, &monitor.CreatedAt, &monitor.UpdatedAt) + if err != nil { + return nil, err + } + monitor.ID, _ = uuid.Parse(id) + monitor.PhoneID, _ = uuid.Parse(phoneID) + monitor.UserID = entities.UserID(userID) + monitor.PhoneOnline = phoneOnline != 0 + return monitor, nil +} diff --git a/api/pkg/repositories/libsql_heartbeat_repository.go b/api/pkg/repositories/libsql_heartbeat_repository.go new file mode 100644 index 00000000..ce1620eb --- /dev/null +++ b/api/pkg/repositories/libsql_heartbeat_repository.go @@ -0,0 +1,173 @@ +package repositories + +import ( + "context" + "database/sql" + "fmt" + + "github.com/google/uuid" + + "github.com/NdoleStudio/httpsms/pkg/entities" + "github.com/NdoleStudio/httpsms/pkg/telemetry" + "github.com/palantir/stacktrace" +) + +// libsqlHeartbeatRepository is responsible for persisting entities.Heartbeat in Turso/libSQL +type libsqlHeartbeatRepository struct { + logger telemetry.Logger + tracer telemetry.Tracer + db *sql.DB +} + +// NewLibsqlHeartbeatRepository creates the libSQL version of the HeartbeatRepository +func NewLibsqlHeartbeatRepository( + logger telemetry.Logger, + tracer telemetry.Tracer, + db *sql.DB, +) HeartbeatRepository { + return &libsqlHeartbeatRepository{ + logger: logger.WithService(fmt.Sprintf("%T", &libsqlHeartbeatRepository{})), + tracer: tracer, + db: db, + } +} + +func (repository *libsqlHeartbeatRepository) Store(ctx context.Context, heartbeat *entities.Heartbeat) error { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) + defer cancel() + + _, err := repository.db.ExecContext(ctx, + "INSERT INTO "+tableHeartbeats+" (id, owner, version, charging, user_id, timestamp) VALUES (?, ?, ?, ?, ?, ?)", + heartbeat.ID.String(), + heartbeat.Owner, + heartbeat.Version, + boolToInt(heartbeat.Charging), + string(heartbeat.UserID), + heartbeat.Timestamp.UTC(), + ) + if err != nil { + msg := fmt.Sprintf("cannot save heartbeat with ID [%s]", heartbeat.ID) + return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + + return nil +} + +func (repository *libsqlHeartbeatRepository) Index(ctx context.Context, userID entities.UserID, owner string, params IndexParams) (*[]entities.Heartbeat, error) { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) + defer cancel() + + var rows *sql.Rows + var err error + + if len(params.Query) > 0 { + queryPattern := "%" + params.Query + "%" + rows, err = repository.db.QueryContext(ctx, + "SELECT id, owner, version, charging, user_id, timestamp FROM "+tableHeartbeats+" WHERE user_id = ? AND owner = ? AND version LIKE ? ORDER BY timestamp DESC LIMIT ? OFFSET ?", + string(userID), owner, queryPattern, params.Limit, params.Skip, + ) + } else { + rows, err = repository.db.QueryContext(ctx, + "SELECT id, owner, version, charging, user_id, timestamp FROM "+tableHeartbeats+" WHERE user_id = ? AND owner = ? ORDER BY timestamp DESC LIMIT ? OFFSET ?", + string(userID), owner, params.Limit, params.Skip, + ) + } + if err != nil { + msg := fmt.Sprintf("cannot fetch heartbeats with owner [%s] and params [%+#v]", owner, params) + return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + defer rows.Close() + + heartbeats := make([]entities.Heartbeat, 0) + for rows.Next() { + heartbeat, scanErr := scanHeartbeat(rows) + if scanErr != nil { + msg := fmt.Sprintf("cannot scan heartbeat row for owner [%s]", owner) + return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(scanErr, msg)) + } + heartbeats = append(heartbeats, *heartbeat) + } + + return &heartbeats, nil +} + +func (repository *libsqlHeartbeatRepository) Last(ctx context.Context, userID entities.UserID, owner string) (*entities.Heartbeat, error) { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) + defer cancel() + + row := repository.db.QueryRowContext(ctx, + "SELECT id, owner, version, charging, user_id, timestamp FROM "+tableHeartbeats+" WHERE user_id = ? AND owner = ? ORDER BY timestamp DESC LIMIT 1", + string(userID), owner, + ) + + heartbeat, err := scanHeartbeatRow(row) + if err == sql.ErrNoRows { + msg := fmt.Sprintf("heartbeat with userID [%s] and owner [%s] does not exist", userID, owner) + return nil, repository.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, ErrCodeNotFound, msg)) + } + if err != nil { + msg := fmt.Sprintf("cannot load heartbeat with userID [%s] and owner [%s]", userID, owner) + return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + + return heartbeat, nil +} + +func (repository *libsqlHeartbeatRepository) DeleteAllForUser(ctx context.Context, userID entities.UserID) error { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + _, err := repository.db.ExecContext(ctx, "DELETE FROM "+tableHeartbeats+" WHERE user_id = ?", string(userID)) + if err != nil { + msg := fmt.Sprintf("cannot delete all [%T] for user with ID [%s]", &entities.Heartbeat{}, userID) + return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + + return nil +} + +func scanHeartbeat(rows *sql.Rows) (*entities.Heartbeat, error) { + heartbeat := new(entities.Heartbeat) + var id string + var charging int + var userID string + err := rows.Scan(&id, &heartbeat.Owner, &heartbeat.Version, &charging, &userID, &heartbeat.Timestamp) + if err != nil { + return nil, err + } + heartbeat.ID, _ = uuid.Parse(id) + heartbeat.Charging = charging != 0 + heartbeat.UserID = entities.UserID(userID) + return heartbeat, nil +} + +func scanHeartbeatRow(row *sql.Row) (*entities.Heartbeat, error) { + heartbeat := new(entities.Heartbeat) + var id string + var charging int + var userID string + err := row.Scan(&id, &heartbeat.Owner, &heartbeat.Version, &charging, &userID, &heartbeat.Timestamp) + if err != nil { + return nil, err + } + heartbeat.ID, _ = uuid.Parse(id) + heartbeat.Charging = charging != 0 + heartbeat.UserID = entities.UserID(userID) + return heartbeat, nil +} + +func boolToInt(b bool) int { + if b { + return 1 + } + return 0 +} From daff415df9056b113990741fdb4feac8bdb3167b Mon Sep 17 00:00:00 2001 From: Acho Arnold Date: Fri, 15 May 2026 23:14:32 +0300 Subject: [PATCH 03/14] fix(api): add context timeout to DeleteAllForUser in libSQL repos Also update design spec to reference correct package (libsql-client-go, not go-libsql). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- api/pkg/repositories/libsql_heartbeat_monitor_repository.go | 3 +++ api/pkg/repositories/libsql_heartbeat_repository.go | 3 +++ .../specs/2026-05-15-turso-heartbeat-backend-design.md | 4 ++-- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/api/pkg/repositories/libsql_heartbeat_monitor_repository.go b/api/pkg/repositories/libsql_heartbeat_monitor_repository.go index 9477f2d7..3987b058 100644 --- a/api/pkg/repositories/libsql_heartbeat_monitor_repository.go +++ b/api/pkg/repositories/libsql_heartbeat_monitor_repository.go @@ -165,6 +165,9 @@ func (repository *libsqlHeartbeatMonitorRepository) DeleteAllForUser(ctx context ctx, span := repository.tracer.Start(ctx) defer span.End() + ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) + defer cancel() + _, err := repository.db.ExecContext(ctx, "DELETE FROM "+tableHeartbeatMonitors+" WHERE user_id = ?", string(userID)) if err != nil { msg := fmt.Sprintf("cannot delete all [%T] for user with ID [%s]", &entities.HeartbeatMonitor{}, userID) diff --git a/api/pkg/repositories/libsql_heartbeat_repository.go b/api/pkg/repositories/libsql_heartbeat_repository.go index ce1620eb..34f4398f 100644 --- a/api/pkg/repositories/libsql_heartbeat_repository.go +++ b/api/pkg/repositories/libsql_heartbeat_repository.go @@ -126,6 +126,9 @@ func (repository *libsqlHeartbeatRepository) DeleteAllForUser(ctx context.Contex ctx, span := repository.tracer.Start(ctx) defer span.End() + ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) + defer cancel() + _, err := repository.db.ExecContext(ctx, "DELETE FROM "+tableHeartbeats+" WHERE user_id = ?", string(userID)) if err != nil { msg := fmt.Sprintf("cannot delete all [%T] for user with ID [%s]", &entities.Heartbeat{}, userID) diff --git a/docs/superpowers/specs/2026-05-15-turso-heartbeat-backend-design.md b/docs/superpowers/specs/2026-05-15-turso-heartbeat-backend-design.md index 4008c80d..da489276 100644 --- a/docs/superpowers/specs/2026-05-15-turso-heartbeat-backend-design.md +++ b/docs/superpowers/specs/2026-05-15-turso-heartbeat-backend-design.md @@ -25,7 +25,7 @@ When `HEARTBEAT_DB_BACKEND` is not set or is any value other than `turso`, the e ### Approach -Direct `database/sql` with the official Turso Go driver (`github.com/tursodatabase/go-libsql`). No ORM — raw SQL queries for a simple 2-table schema. +Direct `database/sql` with the Turso Go remote driver (`github.com/tursodatabase/libsql-client-go`). No ORM — raw SQL queries for a simple 2-table schema. This is the pure-Go HTTP driver for remote Turso Cloud access (no CGo required). ### New Files @@ -40,7 +40,7 @@ Direct `database/sql` with the official Turso Go driver (`github.com/tursodataba | File | Change | | ------------------------- | --------------------------------------------------------------------------------------------------------------------------------- | | `api/pkg/di/container.go` | Add `tursoDB *sql.DB` field, `TursoDB()` method, conditional wiring in `HeartbeatRepository()` and `HeartbeatMonitorRepository()` | -| `api/go.mod` | Add `github.com/tursodatabase/go-libsql` dependency | +| `api/go.mod` | Add `github.com/tursodatabase/libsql-client-go` dependency | ## Database Schema From 064a9a316f2c3c24d7414f0962d54f402b6d317d Mon Sep 17 00:00:00 2001 From: Acho Arnold Date: Fri, 15 May 2026 23:34:32 +0300 Subject: [PATCH 04/14] docs: add design spec for hedging repository pattern Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../2026-05-15-hedging-repository-design.md | 164 ++++++++++++++++++ 1 file changed, 164 insertions(+) create mode 100644 docs/superpowers/specs/2026-05-15-hedging-repository-design.md diff --git a/docs/superpowers/specs/2026-05-15-hedging-repository-design.md b/docs/superpowers/specs/2026-05-15-hedging-repository-design.md new file mode 100644 index 00000000..2afc6ace --- /dev/null +++ b/docs/superpowers/specs/2026-05-15-hedging-repository-design.md @@ -0,0 +1,164 @@ +# Hedging Repository for Heartbeat & Monitor + +**Date:** 2026-05-15 +**Status:** Approved + +## Overview + +Create composite "hedging" repositories for `HeartbeatRepository` and `HeartbeatMonitorRepository` that write to both GORM (primary) and Turso (secondary). Reads only hit the primary. Secondary writes are fail-open — errors are logged and a metric is emitted, but the operation succeeds from the caller's perspective. + +## Motivation + +Gradually migrate heartbeat data to Turso by dual-writing. The GORM/PostgreSQL backend remains the source of truth while Turso builds up a complete dataset. If Turso has issues, the system is unaffected. + +## Configuration + +Activated via `HEARTBEAT_DB_BACKEND=hedging`. The three modes are now: + +| Value | Behavior | +| ----------------- | ---------------------------------------------------------------------- | +| _(unset/default)_ | GORM/PostgreSQL only | +| `turso` | Turso/libSQL only | +| `hedging` | GORM primary (reads+writes) + Turso secondary (writes only, fail-open) | + +## Architecture + +### New Files + +| File | Purpose | +| -------------------------------------------------------------- | -------------------------------------- | +| `api/pkg/repositories/hedging_heartbeat_repository.go` | Composite `HeartbeatRepository` | +| `api/pkg/repositories/hedging_heartbeat_monitor_repository.go` | Composite `HeartbeatMonitorRepository` | + +### Modified Files + +| File | Change | +| ------------------------- | ------------------------------------------------------------------ | +| `api/pkg/di/container.go` | Add `hedging` case to switch, add `HedgingFailureCounter()` method | + +## Method Delegation + +### HeartbeatRepository + +| Method | Primary (GORM) | Secondary (Turso) | +| ------------------ | -------------- | -------------------- | +| `Store` | ✅ write | ✅ write (fail-open) | +| `Index` | ✅ read | ❌ skip | +| `Last` | ✅ read | ❌ skip | +| `DeleteAllForUser` | ✅ write | ✅ write (fail-open) | + +### HeartbeatMonitorRepository + +| Method | Primary (GORM) | Secondary (Turso) | +| ------------------- | -------------- | -------------------- | +| `Store` | ✅ write | ✅ write (fail-open) | +| `Load` | ✅ read | ❌ skip | +| `Exists` | ✅ read | ❌ skip | +| `UpdateQueueID` | ✅ write | ✅ write (fail-open) | +| `Delete` | ✅ write | ✅ write (fail-open) | +| `UpdatePhoneOnline` | ✅ write | ✅ write (fail-open) | +| `DeleteAllForUser` | ✅ write | ✅ write (fail-open) | + +## Struct Design + +```go +type hedgingHeartbeatRepository struct { + logger telemetry.Logger + tracer telemetry.Tracer + primary HeartbeatRepository + secondary HeartbeatRepository + failureCounter otelMetric.Int64Counter +} + +type hedgingHeartbeatMonitorRepository struct { + logger telemetry.Logger + tracer telemetry.Tracer + primary HeartbeatMonitorRepository + secondary HeartbeatMonitorRepository + failureCounter otelMetric.Int64Counter +} +``` + +## Error Handling (Fail-Open Pattern) + +```go +func (r *hedgingHeartbeatRepository) Store(ctx context.Context, heartbeat *entities.Heartbeat) error { + ctx, span := r.tracer.Start(ctx) + defer span.End() + + // Primary: must succeed + if err := r.primary.Store(ctx, heartbeat); err != nil { + return err + } + + // Secondary: fail-open (log + metric) + if err := r.secondary.Store(ctx, heartbeat); err != nil { + r.logger.Error(stacktrace.Propagate(err, fmt.Sprintf( + "hedging: secondary write failed for heartbeat [%s]", heartbeat.ID, + ))) + r.failureCounter.Add(ctx, 1) + } + + return nil +} +``` + +**Rules:** + +- Primary error → return immediately, no secondary attempt +- Secondary error → log at ERROR level, increment `failureCounter`, return nil +- Read methods → delegate directly to primary, no secondary involvement +- Each method has its own tracing span via `tracer.Start(ctx)` + +## Observability + +**Metric:** + +- Name: `hedging.secondary.write.failures` +- Unit: `1` (count) +- Description: `Number of failed secondary writes in hedging repositories` +- Created once in DI container, shared by both hedging repos + +**Logging:** Each secondary failure logs at ERROR with the method context (entity ID, user ID, etc.) + +## DI Container Changes + +```go +func (container *Container) HeartbeatRepository() repositories.HeartbeatRepository { + switch os.Getenv("HEARTBEAT_DB_BACKEND") { + case "turso": + return repositories.NewLibsqlHeartbeatRepository(...) + case "hedging": + return repositories.NewHedgingHeartbeatRepository( + container.Logger(), + container.Tracer(), + repositories.NewGormHeartbeatRepository(container.Logger(), container.Tracer(), container.DedicatedDB()), + repositories.NewLibsqlHeartbeatRepository(container.Logger(), container.Tracer(), container.TursoDB()), + container.HedgingFailureCounter(), + ) + default: + return repositories.NewGormHeartbeatRepository(...) + } +} + +func (container *Container) HedgingFailureCounter() otelMetric.Int64Counter { + meter := otel.GetMeterProvider().Meter(container.projectID) + counter, err := meter.Int64Counter("hedging.secondary.write.failures", + otelMetric.WithUnit("1"), + otelMetric.WithDescription("Number of failed secondary writes in hedging repositories"), + ) + if err != nil { + container.logger.Fatal(...) + } + return counter +} +``` + +Same switch pattern for `HeartbeatMonitorRepository()`. + +## Safety + +- Default behavior (no env var) is unchanged — pure GORM/PostgreSQL +- `turso` mode remains available for pure Turso usage +- `hedging` mode never fails due to Turso issues — secondary is fully fail-open +- Service layer requires no changes — same interfaces From e793fdfb561453a865a270776ff4acc56f5fa083 Mon Sep 17 00:00:00 2001 From: Acho Arnold Date: Fri, 15 May 2026 23:42:13 +0300 Subject: [PATCH 05/14] feat(api): add hedging repositories for heartbeat dual-write Add composite repositories that write to GORM (primary) and Turso (secondary) with fail-open semantics. Secondary failures are logged and counted via OTel metric. Activated via HEARTBEAT_DB_BACKEND=hedging. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- api/pkg/di/container.go | 67 ++++- .../hedging_heartbeat_monitor_repository.go | 128 +++++++++ .../hedging_heartbeat_repository.go | 79 ++++++ .../plans/2026-05-15-hedging-repository.md | 265 ++++++++++++++++++ 4 files changed, 525 insertions(+), 14 deletions(-) create mode 100644 api/pkg/repositories/hedging_heartbeat_monitor_repository.go create mode 100644 api/pkg/repositories/hedging_heartbeat_repository.go create mode 100644 docs/superpowers/plans/2026-05-15-hedging-repository.md diff --git a/api/pkg/di/container.go b/api/pkg/di/container.go index 24f3a174..d771813f 100644 --- a/api/pkg/di/container.go +++ b/api/pkg/di/container.go @@ -317,6 +317,23 @@ func (container *Container) TursoDB() *sql.DB { return container.tursoDB } +// HedgingFailureCounter creates an OTel counter for hedging secondary write failures +func (container *Container) HedgingFailureCounter() otelMetric.Int64Counter { + meter := otel.GetMeterProvider().Meter( + container.projectID, + otelMetric.WithInstrumentationVersion(otel.Version()), + ) + counter, err := meter.Int64Counter( + "hedging.secondary.write.failures", + otelMetric.WithUnit("1"), + otelMetric.WithDescription("Number of failed secondary writes in hedging repositories"), + ) + if err != nil { + container.logger.Fatal(stacktrace.Propagate(err, "cannot create hedging failure counter")) + } + return counter +} + // DBWithoutMigration creates an instance of gorm.DB if it has not been created already func (container *Container) DBWithoutMigration() (db *gorm.DB) { if container.db != nil { @@ -911,20 +928,31 @@ func (container *Container) MessageThreadRepository() (repository repositories.M // HeartbeatMonitorRepository creates a new instance of repositories.HeartbeatMonitorRepository func (container *Container) HeartbeatMonitorRepository() (repository repositories.HeartbeatMonitorRepository) { - if os.Getenv("HEARTBEAT_DB_BACKEND") == "turso" { + switch os.Getenv("HEARTBEAT_DB_BACKEND") { + case "turso": container.logger.Debug("creating libSQL repositories.HeartbeatMonitorRepository") return repositories.NewLibsqlHeartbeatMonitorRepository( container.Logger(), container.Tracer(), container.TursoDB(), ) + case "hedging": + container.logger.Debug("creating hedging repositories.HeartbeatMonitorRepository") + return repositories.NewHedgingHeartbeatMonitorRepository( + container.Logger(), + container.Tracer(), + repositories.NewGormHeartbeatMonitorRepository(container.Logger(), container.Tracer(), container.DedicatedDB()), + repositories.NewLibsqlHeartbeatMonitorRepository(container.Logger(), container.Tracer(), container.TursoDB()), + container.HedgingFailureCounter(), + ) + default: + container.logger.Debug("creating GORM repositories.HeartbeatMonitorRepository") + return repositories.NewGormHeartbeatMonitorRepository( + container.Logger(), + container.Tracer(), + container.DedicatedDB(), + ) } - container.logger.Debug("creating GORM repositories.HeartbeatMonitorRepository") - return repositories.NewGormHeartbeatMonitorRepository( - container.Logger(), - container.Tracer(), - container.DedicatedDB(), - ) } // HeartbeatService creates a new instance of services.HeartbeatService @@ -1738,20 +1766,31 @@ func (container *Container) RegisterSwaggerRoutes() { // HeartbeatRepository registers a new instance of repositories.HeartbeatRepository func (container *Container) HeartbeatRepository() repositories.HeartbeatRepository { - if os.Getenv("HEARTBEAT_DB_BACKEND") == "turso" { + switch os.Getenv("HEARTBEAT_DB_BACKEND") { + case "turso": container.logger.Debug("creating libSQL repositories.HeartbeatRepository") return repositories.NewLibsqlHeartbeatRepository( container.Logger(), container.Tracer(), container.TursoDB(), ) + case "hedging": + container.logger.Debug("creating hedging repositories.HeartbeatRepository") + return repositories.NewHedgingHeartbeatRepository( + container.Logger(), + container.Tracer(), + repositories.NewGormHeartbeatRepository(container.Logger(), container.Tracer(), container.DedicatedDB()), + repositories.NewLibsqlHeartbeatRepository(container.Logger(), container.Tracer(), container.TursoDB()), + container.HedgingFailureCounter(), + ) + default: + container.logger.Debug("creating GORM repositories.HeartbeatRepository") + return repositories.NewGormHeartbeatRepository( + container.Logger(), + container.Tracer(), + container.DedicatedDB(), + ) } - container.logger.Debug("creating GORM repositories.HeartbeatRepository") - return repositories.NewGormHeartbeatRepository( - container.Logger(), - container.Tracer(), - container.DedicatedDB(), - ) } // UserRepository registers a new instance of repositories.UserRepository diff --git a/api/pkg/repositories/hedging_heartbeat_monitor_repository.go b/api/pkg/repositories/hedging_heartbeat_monitor_repository.go new file mode 100644 index 00000000..3304230c --- /dev/null +++ b/api/pkg/repositories/hedging_heartbeat_monitor_repository.go @@ -0,0 +1,128 @@ +package repositories + +import ( + "context" + "fmt" + + "github.com/google/uuid" + otelMetric "go.opentelemetry.io/otel/metric" + + "github.com/NdoleStudio/httpsms/pkg/entities" + "github.com/NdoleStudio/httpsms/pkg/telemetry" + "github.com/palantir/stacktrace" +) + +// hedgingHeartbeatMonitorRepository writes to both primary and secondary repositories. +// Reads only hit primary. Secondary writes are fail-open. +type hedgingHeartbeatMonitorRepository struct { + logger telemetry.Logger + tracer telemetry.Tracer + primary HeartbeatMonitorRepository + secondary HeartbeatMonitorRepository + failureCounter otelMetric.Int64Counter +} + +// NewHedgingHeartbeatMonitorRepository creates a hedging HeartbeatMonitorRepository +func NewHedgingHeartbeatMonitorRepository( + logger telemetry.Logger, + tracer telemetry.Tracer, + primary HeartbeatMonitorRepository, + secondary HeartbeatMonitorRepository, + failureCounter otelMetric.Int64Counter, +) HeartbeatMonitorRepository { + return &hedgingHeartbeatMonitorRepository{ + logger: logger.WithService(fmt.Sprintf("%T", &hedgingHeartbeatMonitorRepository{})), + tracer: tracer, + primary: primary, + secondary: secondary, + failureCounter: failureCounter, + } +} + +func (repository *hedgingHeartbeatMonitorRepository) Store(ctx context.Context, monitor *entities.HeartbeatMonitor) error { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + if err := repository.primary.Store(ctx, monitor); err != nil { + return err + } + + if err := repository.secondary.Store(ctx, monitor); err != nil { + repository.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("hedging: secondary write failed for monitor [%s]", monitor.ID))) + repository.failureCounter.Add(ctx, 1) + } + + return nil +} + +func (repository *hedgingHeartbeatMonitorRepository) Load(ctx context.Context, userID entities.UserID, phoneNumber string) (*entities.HeartbeatMonitor, error) { + return repository.primary.Load(ctx, userID, phoneNumber) +} + +func (repository *hedgingHeartbeatMonitorRepository) Exists(ctx context.Context, userID entities.UserID, monitorID uuid.UUID) (bool, error) { + return repository.primary.Exists(ctx, userID, monitorID) +} + +func (repository *hedgingHeartbeatMonitorRepository) UpdateQueueID(ctx context.Context, monitorID uuid.UUID, queueID string) error { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + if err := repository.primary.UpdateQueueID(ctx, monitorID, queueID); err != nil { + return err + } + + if err := repository.secondary.UpdateQueueID(ctx, monitorID, queueID); err != nil { + repository.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("hedging: secondary UpdateQueueID failed for monitor [%s]", monitorID))) + repository.failureCounter.Add(ctx, 1) + } + + return nil +} + +func (repository *hedgingHeartbeatMonitorRepository) Delete(ctx context.Context, userID entities.UserID, phoneNumber string) error { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + if err := repository.primary.Delete(ctx, userID, phoneNumber); err != nil { + return err + } + + if err := repository.secondary.Delete(ctx, userID, phoneNumber); err != nil { + repository.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("hedging: secondary delete failed for monitor with owner [%s]", phoneNumber))) + repository.failureCounter.Add(ctx, 1) + } + + return nil +} + +func (repository *hedgingHeartbeatMonitorRepository) UpdatePhoneOnline(ctx context.Context, userID entities.UserID, monitorID uuid.UUID, online bool) error { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + if err := repository.primary.UpdatePhoneOnline(ctx, userID, monitorID, online); err != nil { + return err + } + + if err := repository.secondary.UpdatePhoneOnline(ctx, userID, monitorID, online); err != nil { + repository.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("hedging: secondary UpdatePhoneOnline failed for monitor [%s]", monitorID))) + repository.failureCounter.Add(ctx, 1) + } + + return nil +} + +func (repository *hedgingHeartbeatMonitorRepository) DeleteAllForUser(ctx context.Context, userID entities.UserID) error { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + if err := repository.primary.DeleteAllForUser(ctx, userID); err != nil { + return err + } + + if err := repository.secondary.DeleteAllForUser(ctx, userID); err != nil { + repository.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("hedging: secondary delete all failed for user [%s]", userID))) + repository.failureCounter.Add(ctx, 1) + } + + return nil +} diff --git a/api/pkg/repositories/hedging_heartbeat_repository.go b/api/pkg/repositories/hedging_heartbeat_repository.go new file mode 100644 index 00000000..07346264 --- /dev/null +++ b/api/pkg/repositories/hedging_heartbeat_repository.go @@ -0,0 +1,79 @@ +package repositories + +import ( + "context" + "fmt" + + otelMetric "go.opentelemetry.io/otel/metric" + + "github.com/NdoleStudio/httpsms/pkg/entities" + "github.com/NdoleStudio/httpsms/pkg/telemetry" + "github.com/palantir/stacktrace" +) + +// hedgingHeartbeatRepository writes to both primary and secondary repositories. +// Reads only hit primary. Secondary writes are fail-open. +type hedgingHeartbeatRepository struct { + logger telemetry.Logger + tracer telemetry.Tracer + primary HeartbeatRepository + secondary HeartbeatRepository + failureCounter otelMetric.Int64Counter +} + +// NewHedgingHeartbeatRepository creates a hedging HeartbeatRepository +func NewHedgingHeartbeatRepository( + logger telemetry.Logger, + tracer telemetry.Tracer, + primary HeartbeatRepository, + secondary HeartbeatRepository, + failureCounter otelMetric.Int64Counter, +) HeartbeatRepository { + return &hedgingHeartbeatRepository{ + logger: logger.WithService(fmt.Sprintf("%T", &hedgingHeartbeatRepository{})), + tracer: tracer, + primary: primary, + secondary: secondary, + failureCounter: failureCounter, + } +} + +func (repository *hedgingHeartbeatRepository) Store(ctx context.Context, heartbeat *entities.Heartbeat) error { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + if err := repository.primary.Store(ctx, heartbeat); err != nil { + return err + } + + if err := repository.secondary.Store(ctx, heartbeat); err != nil { + repository.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("hedging: secondary write failed for heartbeat [%s]", heartbeat.ID))) + repository.failureCounter.Add(ctx, 1) + } + + return nil +} + +func (repository *hedgingHeartbeatRepository) Index(ctx context.Context, userID entities.UserID, owner string, params IndexParams) (*[]entities.Heartbeat, error) { + return repository.primary.Index(ctx, userID, owner, params) +} + +func (repository *hedgingHeartbeatRepository) Last(ctx context.Context, userID entities.UserID, owner string) (*entities.Heartbeat, error) { + return repository.primary.Last(ctx, userID, owner) +} + +func (repository *hedgingHeartbeatRepository) DeleteAllForUser(ctx context.Context, userID entities.UserID) error { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + if err := repository.primary.DeleteAllForUser(ctx, userID); err != nil { + return err + } + + if err := repository.secondary.DeleteAllForUser(ctx, userID); err != nil { + repository.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("hedging: secondary delete failed for user [%s]", userID))) + repository.failureCounter.Add(ctx, 1) + } + + return nil +} diff --git a/docs/superpowers/plans/2026-05-15-hedging-repository.md b/docs/superpowers/plans/2026-05-15-hedging-repository.md new file mode 100644 index 00000000..e3b838fa --- /dev/null +++ b/docs/superpowers/plans/2026-05-15-hedging-repository.md @@ -0,0 +1,265 @@ +# Hedging Repository Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +> **Status:** All tasks implemented. This plan was written retroactively to document and verify the implementation. + +**Goal:** Create composite hedging repositories that dual-write to GORM (primary) and Turso (secondary) with fail-open semantics on secondary failures. + +**Architecture:** Two new repository files implement the existing `HeartbeatRepository` and `HeartbeatMonitorRepository` interfaces by delegating reads to primary only and writes to both. Secondary write failures are logged and counted via an OTel metric but never propagated. Activated via `HEARTBEAT_DB_BACKEND=hedging`. + +**Tech Stack:** Go, OpenTelemetry metrics (`go.opentelemetry.io/otel/metric`), existing repository interfaces + +**Spec:** `docs/superpowers/specs/2026-05-15-hedging-repository-design.md` + +--- + +### Task 1: Create hedging heartbeat repository ✅ + +**Files:** + +- Created: `api/pkg/repositories/hedging_heartbeat_repository.go` + +- [ ] **Step 1: Create the hedging heartbeat repository file** + +```go +package repositories + +import ( + "context" + "fmt" + + otelMetric "go.opentelemetry.io/otel/metric" + + "github.com/NdoleStudio/httpsms/pkg/entities" + "github.com/NdoleStudio/httpsms/pkg/telemetry" + "github.com/palantir/stacktrace" +) + +type hedgingHeartbeatRepository struct { + logger telemetry.Logger + tracer telemetry.Tracer + primary HeartbeatRepository + secondary HeartbeatRepository + failureCounter otelMetric.Int64Counter +} + +func NewHedgingHeartbeatRepository( + logger telemetry.Logger, + tracer telemetry.Tracer, + primary HeartbeatRepository, + secondary HeartbeatRepository, + failureCounter otelMetric.Int64Counter, +) HeartbeatRepository { + return &hedgingHeartbeatRepository{ + logger: logger.WithService(fmt.Sprintf("%T", &hedgingHeartbeatRepository{})), + tracer: tracer, + primary: primary, + secondary: secondary, + failureCounter: failureCounter, + } +} +``` + +Implement 4 methods: + +- `Store` — write to primary, then secondary (fail-open with log + metric) +- `Index` — delegate to primary only +- `Last` — delegate to primary only +- `DeleteAllForUser` — write to primary, then secondary (fail-open with log + metric) + +Write methods follow this pattern: + +```go +func (r *hedgingHeartbeatRepository) Store(ctx context.Context, heartbeat *entities.Heartbeat) error { + ctx, span := r.tracer.Start(ctx) + defer span.End() + + if err := r.primary.Store(ctx, heartbeat); err != nil { + return err + } + + if err := r.secondary.Store(ctx, heartbeat); err != nil { + r.logger.Error(stacktrace.Propagate(err, fmt.Sprintf("hedging: secondary write failed for heartbeat [%s]", heartbeat.ID))) + r.failureCounter.Add(ctx, 1) + } + + return nil +} +``` + +Read methods simply delegate: + +```go +func (r *hedgingHeartbeatRepository) Index(ctx context.Context, userID entities.UserID, owner string, params IndexParams) (*[]entities.Heartbeat, error) { + return r.primary.Index(ctx, userID, owner, params) +} +``` + +- [ ] **Step 2: Verify build** + +Run: `cd api && go build ./...` +Expected: exit code 0 + +- [ ] **Step 3: Commit** + +```bash +git add api/pkg/repositories/hedging_heartbeat_repository.go +git commit -m "feat(api): add hedging heartbeat repository" +``` + +--- + +### Task 2: Create hedging heartbeat monitor repository ✅ + +**Files:** + +- Created: `api/pkg/repositories/hedging_heartbeat_monitor_repository.go` + +- [ ] **Step 1: Create the hedging heartbeat monitor repository file** + +Same struct pattern as Task 1 but wrapping `HeartbeatMonitorRepository` interface. + +Implement 7 methods: + +| Method | Behavior | +| ------------------- | ---------------------- | +| `Store` | Write both (fail-open) | +| `Load` | Primary only | +| `Exists` | Primary only | +| `UpdateQueueID` | Write both (fail-open) | +| `Delete` | Write both (fail-open) | +| `UpdatePhoneOnline` | Write both (fail-open) | +| `DeleteAllForUser` | Write both (fail-open) | + +All write methods follow the same fail-open pattern: primary must succeed, secondary logs + increments counter on failure. + +- [ ] **Step 2: Verify build** + +Run: `cd api && go build ./...` +Expected: exit code 0 + +- [ ] **Step 3: Commit** + +```bash +git add api/pkg/repositories/hedging_heartbeat_monitor_repository.go +git commit -m "feat(api): add hedging heartbeat monitor repository" +``` + +--- + +### Task 3: Add HedgingFailureCounter to DI container ✅ + +**Files:** + +- Modified: `api/pkg/di/container.go` (added `HedgingFailureCounter()` method after `TursoDB()`, ~line 320) + +- [ ] **Step 1: Add the HedgingFailureCounter method** + +```go +func (container *Container) HedgingFailureCounter() otelMetric.Int64Counter { + meter := otel.GetMeterProvider().Meter( + container.projectID, + otelMetric.WithInstrumentationVersion(otel.Version()), + ) + counter, err := meter.Int64Counter( + "hedging.secondary.write.failures", + otelMetric.WithUnit("1"), + otelMetric.WithDescription("Number of failed secondary writes in hedging repositories"), + ) + if err != nil { + container.logger.Fatal(stacktrace.Propagate(err, "cannot create hedging failure counter")) + } + return counter +} +``` + +- [ ] **Step 2: Verify build** + +Run: `cd api && go build ./...` +Expected: exit code 0 + +--- + +### Task 4: Wire hedging mode in DI container ✅ + +**Files:** + +- Modified: `api/pkg/di/container.go` + + - `HeartbeatRepository()` (~line 1768) + - `HeartbeatMonitorRepository()` (~line 930) + +- [ ] **Step 1: Change both methods from if/else to switch** + +Replace the existing `if os.Getenv("HEARTBEAT_DB_BACKEND") == "turso"` with a switch: + +```go +func (container *Container) HeartbeatRepository() repositories.HeartbeatRepository { + switch os.Getenv("HEARTBEAT_DB_BACKEND") { + case "turso": + // existing libSQL path + case "hedging": + return repositories.NewHedgingHeartbeatRepository( + container.Logger(), + container.Tracer(), + repositories.NewGormHeartbeatRepository(container.Logger(), container.Tracer(), container.DedicatedDB()), + repositories.NewLibsqlHeartbeatRepository(container.Logger(), container.Tracer(), container.TursoDB()), + container.HedgingFailureCounter(), + ) + default: + // existing GORM path + } +} +``` + +Same pattern for `HeartbeatMonitorRepository()`. + +- [ ] **Step 2: Verify build** + +Run: `cd api && go build ./...` +Expected: exit code 0 + +- [ ] **Step 3: Run pre-commit hooks** + +Run: `cd api && gofumpt -w pkg/repositories/hedging_heartbeat_repository.go pkg/repositories/hedging_heartbeat_monitor_repository.go pkg/di/container.go` +Expected: exit code 0 + +- [ ] **Step 4: Final commit** + +```bash +git add api/pkg/di/container.go +git commit -m "feat(api): wire hedging mode in DI container (HEARTBEAT_DB_BACKEND=hedging)" +``` + +--- + +### Task 5: Final verification + +- [ ] **Step 1: Full build** + +Run: `cd api && go build ./...` +Expected: exit code 0 + +- [ ] **Step 2: Run tests** + +Run: `cd api && go test ./...` +Expected: all tests pass + +- [ ] **Step 3: go vet (no new warnings)** + +Run: `cd api && go vet ./... 2>&1 | Select-String "hedging"` +Expected: only pre-existing `non-constant format string` warnings (same category as all other repos) + +- [ ] **Step 4: Pre-commit hooks pass** + +Run: `git add -A && git commit --dry-run` +Expected: all hooks pass (go-fumpt, go-lint, go-imports, go-mod-tidy) + +- [ ] **Step 5: Verify three modes work (code review)** + +Check that the DI container correctly handles all three values: + +- Default (unset) → GORM only +- `turso` → libSQL only +- `hedging` → GORM primary + libSQL secondary From 4263fdac857409cb9a635bf99e70d17e5015c868 Mon Sep 17 00:00:00 2001 From: Acho Arnold Date: Fri, 15 May 2026 23:47:20 +0300 Subject: [PATCH 06/14] test(api): add hedging heartbeat integration test with Turso Add sqld (libSQL server) to test docker-compose. Integration test stores a heartbeat via the hedging repository and reads it back from both PostgreSQL (primary) and Turso/libSQL (secondary) to verify dual-write. Gated by TEST_DATABASE_URL and TEST_TURSO_DATABASE_URL environment variables. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../hedging_heartbeat_repository_test.go | 91 +++++++++++++++++++ tests/docker-compose.yml | 10 ++ 2 files changed, 101 insertions(+) create mode 100644 api/pkg/repositories/hedging_heartbeat_repository_test.go diff --git a/api/pkg/repositories/hedging_heartbeat_repository_test.go b/api/pkg/repositories/hedging_heartbeat_repository_test.go new file mode 100644 index 00000000..c70d0336 --- /dev/null +++ b/api/pkg/repositories/hedging_heartbeat_repository_test.go @@ -0,0 +1,91 @@ +package repositories + +import ( + "context" + "os" + "testing" + "time" + + "github.com/NdoleStudio/httpsms/pkg/entities" + "github.com/NdoleStudio/httpsms/pkg/telemetry" + "github.com/google/uuid" + "github.com/hirosassa/zerodriver" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "gorm.io/driver/postgres" + "gorm.io/gorm" +) + +func TestHedgingHeartbeatRepository_StoreAndRead(t *testing.T) { + postgresURL := os.Getenv("TEST_DATABASE_URL") + tursoURL := os.Getenv("TEST_TURSO_DATABASE_URL") + if postgresURL == "" || tursoURL == "" { + t.Skip("TEST_DATABASE_URL and TEST_TURSO_DATABASE_URL must be set for integration tests") + } + + // Setup PostgreSQL (primary) + gormDB, err := gorm.Open(postgres.Open(postgresURL), &gorm.Config{}) + require.NoError(t, err) + require.NoError(t, gormDB.AutoMigrate(&entities.Heartbeat{})) + + // Setup Turso/libSQL (secondary) + tursoDB, err := NewTursoDB(tursoURL, os.Getenv("TEST_TURSO_AUTH_TOKEN")) + require.NoError(t, err) + defer tursoDB.Close() + + // Telemetry + driver := zerodriver.NewProductionLogger() + logger := telemetry.NewZerologLogger("test", map[string]string{}, driver, nil) + tracer := telemetry.NewOtelLogger("test", logger) + + failureCounter, err := otel.Meter("test").Int64Counter("hedging.test.failures") + require.NoError(t, err) + + // Build repositories + primaryRepo := NewGormHeartbeatRepository(logger, tracer, gormDB) + secondaryRepo := NewLibsqlHeartbeatRepository(logger, tracer, tursoDB) + hedgingRepo := NewHedgingHeartbeatRepository(logger, tracer, primaryRepo, secondaryRepo, failureCounter) + + // Create test heartbeat + now := time.Now().UTC().Truncate(time.Second) + heartbeat := &entities.Heartbeat{ + ID: uuid.New(), + Owner: "+18005551234", + Version: "test-v1", + Charging: true, + UserID: entities.UserID("test-user-" + uuid.New().String()), + Timestamp: now, + } + + // Store via hedging repo (writes to both stores) + ctx := context.Background() + err = hedgingRepo.Store(ctx, heartbeat) + require.NoError(t, err) + + // Read from primary (PostgreSQL) + primaryResult, err := primaryRepo.Last(ctx, heartbeat.UserID, heartbeat.Owner) + require.NoError(t, err) + assert.Equal(t, heartbeat.ID, primaryResult.ID) + assert.Equal(t, heartbeat.Owner, primaryResult.Owner) + assert.Equal(t, heartbeat.Version, primaryResult.Version) + assert.Equal(t, heartbeat.Charging, primaryResult.Charging) + assert.Equal(t, heartbeat.UserID, primaryResult.UserID) + assert.WithinDuration(t, heartbeat.Timestamp, primaryResult.Timestamp, time.Second) + + // Read from secondary (Turso/libSQL) + secondaryResult, err := secondaryRepo.Last(ctx, heartbeat.UserID, heartbeat.Owner) + require.NoError(t, err) + assert.Equal(t, heartbeat.ID, secondaryResult.ID) + assert.Equal(t, heartbeat.Owner, secondaryResult.Owner) + assert.Equal(t, heartbeat.Version, secondaryResult.Version) + assert.Equal(t, heartbeat.Charging, secondaryResult.Charging) + assert.Equal(t, heartbeat.UserID, secondaryResult.UserID) + assert.WithinDuration(t, heartbeat.Timestamp, secondaryResult.Timestamp, time.Second) + + // Verify both stores have the same data + assert.Equal(t, primaryResult.ID, secondaryResult.ID) + + // Cleanup + require.NoError(t, hedgingRepo.DeleteAllForUser(ctx, heartbeat.UserID)) +} diff --git a/tests/docker-compose.yml b/tests/docker-compose.yml index a4d6b9dc..d0a4502e 100644 --- a/tests/docker-compose.yml +++ b/tests/docker-compose.yml @@ -25,6 +25,16 @@ services: timeout: 5s retries: 10 + sqld: + image: ghcr.io/tursodatabase/libsql-server:latest + ports: + - "8090:8080" + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8080/health"] + interval: 5s + timeout: 5s + retries: 10 + wiremock: image: wiremock/wiremock:3x ports: From 6c1eddfa00c80fdc110eda5094630dfb50217d4e Mon Sep 17 00:00:00 2001 From: Acho Arnold Date: Fri, 15 May 2026 23:49:02 +0300 Subject: [PATCH 07/14] ci: run hedging integration test in GitHub Actions Wait for sqld health before running tests. Set TEST_DATABASE_URL and TEST_TURSO_DATABASE_URL env vars pointing to docker compose services. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .github/workflows/api.yml | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/.github/workflows/api.yml b/.github/workflows/api.yml index beb99182..36cd8ea6 100644 --- a/.github/workflows/api.yml +++ b/.github/workflows/api.yml @@ -37,6 +37,21 @@ jobs: - name: Wait for services to be healthy working-directory: ./tests run: | + echo "Waiting for sqld to be healthy..." + for i in $(seq 1 20); do + if curl -sf http://localhost:8090/health >/dev/null 2>&1; then + echo "sqld is healthy!" + break + fi + if [ $i -eq 20 ]; then + echo "sqld failed to become healthy" + docker compose logs sqld + exit 1 + fi + echo "sqld attempt $i/20 - waiting 3s..." + sleep 3 + done + echo "Waiting for API to be healthy..." for i in $(seq 1 40); do if docker compose exec api curl -sf http://localhost:8000/health >/dev/null 2>&1; then @@ -59,6 +74,13 @@ jobs: docker compose wait seed || true sleep 2 + - name: Run Hedging Repository Integration Tests + working-directory: ./api + env: + TEST_DATABASE_URL: postgresql://dbusername:dbpassword@localhost:5435/httpsms + TEST_TURSO_DATABASE_URL: http://localhost:8090 + run: go test -vet=off -v -timeout 120s -run TestHedging ./pkg/repositories/ + - name: Run Integration Tests working-directory: ./tests run: go test -v -timeout 300s ./... From 244548a3f48d23f7eb6631035f9217bd90a02147 Mon Sep 17 00:00:00 2001 From: Acho Arnold Date: Fri, 15 May 2026 23:54:57 +0300 Subject: [PATCH 08/14] test: add black-box heartbeat integration test with hedging Store a heartbeat via POST /v1/heartbeats and read it back via the Index endpoint. The API is configured with HEARTBEAT_DB_BACKEND=hedging so it dual-writes to both PostgreSQL and Turso/sqld. The test only interacts with the HTTP API, no implementation details exposed. - Add sqld dependency to API service in docker-compose - Add HEARTBEAT_DB_BACKEND, TURSO_DATABASE_URL to .env.test - Remove repo-level integration test in favor of black-box test - Keep sqld health wait in CI workflow Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .github/workflows/api.yml | 7 -- .../hedging_heartbeat_repository_test.go | 91 ------------------- tests/.env.test | 3 + tests/docker-compose.yml | 2 + tests/integration_test.go | 44 +++++++++ 5 files changed, 49 insertions(+), 98 deletions(-) delete mode 100644 api/pkg/repositories/hedging_heartbeat_repository_test.go diff --git a/.github/workflows/api.yml b/.github/workflows/api.yml index 36cd8ea6..e8044d5c 100644 --- a/.github/workflows/api.yml +++ b/.github/workflows/api.yml @@ -74,13 +74,6 @@ jobs: docker compose wait seed || true sleep 2 - - name: Run Hedging Repository Integration Tests - working-directory: ./api - env: - TEST_DATABASE_URL: postgresql://dbusername:dbpassword@localhost:5435/httpsms - TEST_TURSO_DATABASE_URL: http://localhost:8090 - run: go test -vet=off -v -timeout 120s -run TestHedging ./pkg/repositories/ - - name: Run Integration Tests working-directory: ./tests run: go test -v -timeout 300s ./... diff --git a/api/pkg/repositories/hedging_heartbeat_repository_test.go b/api/pkg/repositories/hedging_heartbeat_repository_test.go deleted file mode 100644 index c70d0336..00000000 --- a/api/pkg/repositories/hedging_heartbeat_repository_test.go +++ /dev/null @@ -1,91 +0,0 @@ -package repositories - -import ( - "context" - "os" - "testing" - "time" - - "github.com/NdoleStudio/httpsms/pkg/entities" - "github.com/NdoleStudio/httpsms/pkg/telemetry" - "github.com/google/uuid" - "github.com/hirosassa/zerodriver" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/otel" - "gorm.io/driver/postgres" - "gorm.io/gorm" -) - -func TestHedgingHeartbeatRepository_StoreAndRead(t *testing.T) { - postgresURL := os.Getenv("TEST_DATABASE_URL") - tursoURL := os.Getenv("TEST_TURSO_DATABASE_URL") - if postgresURL == "" || tursoURL == "" { - t.Skip("TEST_DATABASE_URL and TEST_TURSO_DATABASE_URL must be set for integration tests") - } - - // Setup PostgreSQL (primary) - gormDB, err := gorm.Open(postgres.Open(postgresURL), &gorm.Config{}) - require.NoError(t, err) - require.NoError(t, gormDB.AutoMigrate(&entities.Heartbeat{})) - - // Setup Turso/libSQL (secondary) - tursoDB, err := NewTursoDB(tursoURL, os.Getenv("TEST_TURSO_AUTH_TOKEN")) - require.NoError(t, err) - defer tursoDB.Close() - - // Telemetry - driver := zerodriver.NewProductionLogger() - logger := telemetry.NewZerologLogger("test", map[string]string{}, driver, nil) - tracer := telemetry.NewOtelLogger("test", logger) - - failureCounter, err := otel.Meter("test").Int64Counter("hedging.test.failures") - require.NoError(t, err) - - // Build repositories - primaryRepo := NewGormHeartbeatRepository(logger, tracer, gormDB) - secondaryRepo := NewLibsqlHeartbeatRepository(logger, tracer, tursoDB) - hedgingRepo := NewHedgingHeartbeatRepository(logger, tracer, primaryRepo, secondaryRepo, failureCounter) - - // Create test heartbeat - now := time.Now().UTC().Truncate(time.Second) - heartbeat := &entities.Heartbeat{ - ID: uuid.New(), - Owner: "+18005551234", - Version: "test-v1", - Charging: true, - UserID: entities.UserID("test-user-" + uuid.New().String()), - Timestamp: now, - } - - // Store via hedging repo (writes to both stores) - ctx := context.Background() - err = hedgingRepo.Store(ctx, heartbeat) - require.NoError(t, err) - - // Read from primary (PostgreSQL) - primaryResult, err := primaryRepo.Last(ctx, heartbeat.UserID, heartbeat.Owner) - require.NoError(t, err) - assert.Equal(t, heartbeat.ID, primaryResult.ID) - assert.Equal(t, heartbeat.Owner, primaryResult.Owner) - assert.Equal(t, heartbeat.Version, primaryResult.Version) - assert.Equal(t, heartbeat.Charging, primaryResult.Charging) - assert.Equal(t, heartbeat.UserID, primaryResult.UserID) - assert.WithinDuration(t, heartbeat.Timestamp, primaryResult.Timestamp, time.Second) - - // Read from secondary (Turso/libSQL) - secondaryResult, err := secondaryRepo.Last(ctx, heartbeat.UserID, heartbeat.Owner) - require.NoError(t, err) - assert.Equal(t, heartbeat.ID, secondaryResult.ID) - assert.Equal(t, heartbeat.Owner, secondaryResult.Owner) - assert.Equal(t, heartbeat.Version, secondaryResult.Version) - assert.Equal(t, heartbeat.Charging, secondaryResult.Charging) - assert.Equal(t, heartbeat.UserID, secondaryResult.UserID) - assert.WithinDuration(t, heartbeat.Timestamp, secondaryResult.Timestamp, time.Second) - - // Verify both stores have the same data - assert.Equal(t, primaryResult.ID, secondaryResult.ID) - - // Cleanup - require.NoError(t, hedgingRepo.DeleteAllForUser(ctx, heartbeat.UserID)) -} diff --git a/tests/.env.test b/tests/.env.test index a95703f0..b8fb679f 100644 --- a/tests/.env.test +++ b/tests/.env.test @@ -28,3 +28,6 @@ PUSHER_CLUSTER= GCS_BUCKET_NAME= UPTRACE_DSN= CLOUDFLARE_TURNSTILE_SECRET_KEY= +HEARTBEAT_DB_BACKEND=hedging +TURSO_DATABASE_URL=http://sqld:8080 +TURSO_AUTH_TOKEN= diff --git a/tests/docker-compose.yml b/tests/docker-compose.yml index d0a4502e..3b57a48e 100644 --- a/tests/docker-compose.yml +++ b/tests/docker-compose.yml @@ -63,6 +63,8 @@ services: condition: service_healthy wiremock: condition: service_healthy + sqld: + condition: service_healthy env_file: - .env.test environment: diff --git a/tests/integration_test.go b/tests/integration_test.go index 5aae58ff..b70f34b2 100644 --- a/tests/integration_test.go +++ b/tests/integration_test.go @@ -346,3 +346,47 @@ func TestSendSMS_OutstandingFlow(t *testing.T) { assertWebhookJWT(t, req.Request, signingKey) } } + +func TestHeartbeat_StoreAndIndex(t *testing.T) { + ctx := context.Background() + phone := setupPhone(ctx, t, 60) + + // Store a heartbeat via phone API key + storePayload := map[string]interface{}{ + "phone_numbers": []string{phone.PhoneNumber}, + "charging": true, + } + body, err := json.Marshal(storePayload) + require.NoError(t, err) + + url := apiBaseURL + "/v1/heartbeats" + req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) + require.NoError(t, err) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("x-api-key", phone.PhoneAPIKey) + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + + respBody, err := io.ReadAll(resp.Body) + require.NoError(t, err) + require.Equal(t, http.StatusCreated, resp.StatusCode, "store heartbeat failed: %s", string(respBody)) + + // Read heartbeats back via user API key + client := newAPIClient() + heartbeats, indexResp, err := client.Heartbeats.Index(ctx, &httpsms.HeartbeatIndexParams{ + Owner: phone.PhoneNumber, + Limit: 1, + }) + require.NoError(t, err) + require.Equal(t, http.StatusOK, indexResp.HTTPResponse.StatusCode) + + require.NotNil(t, heartbeats) + require.GreaterOrEqual(t, len(heartbeats.Data), 1, "expected at least 1 heartbeat") + + hb := heartbeats.Data[0] + assert.Equal(t, phone.PhoneNumber, hb.Owner) + assert.True(t, hb.Charging) + assert.False(t, hb.Timestamp.IsZero(), "timestamp should not be zero") +} From 91e75b8f5e82093845954e8c98aee7f73aaf4b4f Mon Sep 17 00:00:00 2001 From: Acho Arnold Date: Sat, 16 May 2026 11:30:55 +0300 Subject: [PATCH 09/14] fix(tests): remove curl-based healthcheck from sqld container The ghcr.io/tursodatabase/libsql-server:latest image is based on debian:bullseye-slim and does not include curl. The health check was always failing, causing the container to be reported as unhealthy and blocking the api service from starting. Instead, use service_started condition since sqld starts nearly instantly and the workflow already has an explicit health polling step that checks sqld readiness from the host before running tests. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- tests/docker-compose.yml | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/tests/docker-compose.yml b/tests/docker-compose.yml index 3b57a48e..1e111b78 100644 --- a/tests/docker-compose.yml +++ b/tests/docker-compose.yml @@ -29,11 +29,6 @@ services: image: ghcr.io/tursodatabase/libsql-server:latest ports: - "8090:8080" - healthcheck: - test: ["CMD", "curl", "-f", "http://localhost:8080/health"] - interval: 5s - timeout: 5s - retries: 10 wiremock: image: wiremock/wiremock:3x @@ -64,7 +59,7 @@ services: wiremock: condition: service_healthy sqld: - condition: service_healthy + condition: service_started env_file: - .env.test environment: From fe1785992019f076d4e6009983c9e2178a15da13 Mon Sep 17 00:00:00 2001 From: Acho Arnold Date: Sat, 16 May 2026 11:34:18 +0300 Subject: [PATCH 10/14] refactor(api): combine Turso URL and auth token into single DSN env var Replace TURSO_DATABASE_URL and TURSO_AUTH_TOKEN with a single TURSO_DATABASE_DSN that contains the full connection string including the authToken query parameter. This simplifies configuration and aligns with standard DSN conventions. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- api/pkg/di/container.go | 5 +---- api/pkg/repositories/libsql.go | 7 +++---- .../2026-05-15-turso-heartbeat-backend-design.md | 11 +++++------ tests/.env.test | 3 +-- 4 files changed, 10 insertions(+), 16 deletions(-) diff --git a/api/pkg/di/container.go b/api/pkg/di/container.go index d771813f..fbc69eaf 100644 --- a/api/pkg/di/container.go +++ b/api/pkg/di/container.go @@ -305,10 +305,7 @@ func (container *Container) TursoDB() *sql.DB { container.logger.Debug("creating Turso *sql.DB connection") - db, err := repositories.NewTursoDB( - os.Getenv("TURSO_DATABASE_URL"), - os.Getenv("TURSO_AUTH_TOKEN"), - ) + db, err := repositories.NewTursoDB(os.Getenv("TURSO_DATABASE_DSN")) if err != nil { container.logger.Fatal(err) } diff --git a/api/pkg/repositories/libsql.go b/api/pkg/repositories/libsql.go index 388c475c..66ee8c0e 100644 --- a/api/pkg/repositories/libsql.go +++ b/api/pkg/repositories/libsql.go @@ -15,15 +15,14 @@ const ( ) // NewTursoDB creates a new *sql.DB connection to a Turso database and auto-creates tables -func NewTursoDB(url, authToken string) (*sql.DB, error) { - dsn := url + "?authToken=" + authToken +func NewTursoDB(dsn string) (*sql.DB, error) { db, err := sql.Open("libsql", dsn) if err != nil { - return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot open turso database at [%s]", url)) + return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot open turso database with DSN [%s]", dsn)) } if err = db.Ping(); err != nil { - return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot ping turso database at [%s]", url)) + return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot ping turso database with DSN [%s]", dsn)) } if err = createTursoTables(db); err != nil { diff --git a/docs/superpowers/specs/2026-05-15-turso-heartbeat-backend-design.md b/docs/superpowers/specs/2026-05-15-turso-heartbeat-backend-design.md index da489276..b501650f 100644 --- a/docs/superpowers/specs/2026-05-15-turso-heartbeat-backend-design.md +++ b/docs/superpowers/specs/2026-05-15-turso-heartbeat-backend-design.md @@ -13,11 +13,10 @@ Move heartbeat storage to a dedicated Turso database for cost efficiency and edg ## Configuration -| Env Var | Purpose | Example | -| ---------------------- | -------------------------------------------------------------- | ----------------------------------------------------- | -| `HEARTBEAT_DB_BACKEND` | Selects backend (`turso` = libSQL, anything else = PostgreSQL) | `turso` | -| `TURSO_DATABASE_URL` | Turso database URL | `libsql://httpsms-ndolestudio.aws-us-east-1.turso.io` | -| `TURSO_AUTH_TOKEN` | Turso auth token | `eyJ...` | +| Env Var | Purpose | Example | +| ---------------------- | -------------------------------------------------------------- | ---------------------------------------------------------------------- | +| `HEARTBEAT_DB_BACKEND` | Selects backend (`turso` = libSQL, anything else = PostgreSQL) | `turso` | +| `TURSO_DATABASE_DSN` | Turso database DSN (URL with authToken query param) | `libsql://httpsms-ndolestudio.aws-us-east-1.turso.io?authToken=eyJ...` | When `HEARTBEAT_DB_BACKEND` is not set or is any value other than `turso`, the existing GORM/PostgreSQL path is used unchanged. @@ -132,7 +131,7 @@ func (container *Container) TursoDB() *sql.DB { if container.tursoDB != nil { return container.tursoDB } - db, err := repositories.NewTursoDB(os.Getenv("TURSO_DATABASE_URL"), os.Getenv("TURSO_AUTH_TOKEN")) + db, err := repositories.NewTursoDB(os.Getenv("TURSO_DATABASE_DSN")) if err != nil { container.logger.Fatal(err) } diff --git a/tests/.env.test b/tests/.env.test index b8fb679f..909902ae 100644 --- a/tests/.env.test +++ b/tests/.env.test @@ -29,5 +29,4 @@ GCS_BUCKET_NAME= UPTRACE_DSN= CLOUDFLARE_TURNSTILE_SECRET_KEY= HEARTBEAT_DB_BACKEND=hedging -TURSO_DATABASE_URL=http://sqld:8080 -TURSO_AUTH_TOKEN= +TURSO_DATABASE_DSN=http://sqld:8080 From b01227a1ab98bbb3836903afde62ba8eac49da47 Mon Sep 17 00:00:00 2001 From: Acho Arnold Date: Sat, 16 May 2026 11:44:15 +0300 Subject: [PATCH 11/14] fix(tests): retry heartbeat store to wait for async phone-API-key association The phone API key gets its phone numbers associated asynchronously via the PhoneUpdated event. In the emulator queue mode used by CI, this event is processed in a background goroutine. The heartbeat test was calling the store endpoint immediately, before the async event had associated the phone number with the API key, resulting in a 401. Add a retry loop (up to 15s) consistent with other integration tests that use polling patterns (waitForFCMPush, waitForWebhookEvents). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- api/pkg/di/container.go | 16 ++++++--------- tests/integration_test.go | 42 +++++++++++++++++++++++++-------------- 2 files changed, 33 insertions(+), 25 deletions(-) diff --git a/api/pkg/di/container.go b/api/pkg/di/container.go index fbc69eaf..ce6a82e2 100644 --- a/api/pkg/di/container.go +++ b/api/pkg/di/container.go @@ -271,19 +271,16 @@ func (container *Container) DedicatedDB() (db *gorm.DB) { container.logger.Fatal(err) } - sqlDB, err := db.DB() - if err != nil { - container.logger.Fatal(stacktrace.Propagate(err, "cannot get sql.DB from GORM")) - } - - sqlDB.SetMaxOpenConns(1) - sqlDB.SetMaxIdleConns(0) - sqlDB.SetConnMaxLifetime(10 * time.Second) - if err = db.Use(tracing.NewPlugin()); err != nil { container.logger.Fatal(stacktrace.Propagate(err, "cannot use GORM tracing plugin")) } + container.dedicatedDB = db + if os.Getenv("DATABASE_MIGRATION_SKIP") != "" { + container.logger.Debug(fmt.Sprintf("skipping migrations for [%T]", db)) + return container.dedicatedDB + } + container.logger.Debug(fmt.Sprintf("Running migrations for dedicated [%T]", db)) if err = db.AutoMigrate(&entities.Heartbeat{}); err != nil { container.logger.Fatal(stacktrace.Propagate(err, fmt.Sprintf("cannot migrate %T", &entities.Heartbeat{}))) @@ -293,7 +290,6 @@ func (container *Container) DedicatedDB() (db *gorm.DB) { container.logger.Fatal(stacktrace.Propagate(err, fmt.Sprintf("cannot migrate %T", &entities.HeartbeatMonitor{}))) } - container.dedicatedDB = db return container.dedicatedDB } diff --git a/tests/integration_test.go b/tests/integration_test.go index b70f34b2..12776aac 100644 --- a/tests/integration_test.go +++ b/tests/integration_test.go @@ -351,27 +351,39 @@ func TestHeartbeat_StoreAndIndex(t *testing.T) { ctx := context.Background() phone := setupPhone(ctx, t, 60) - // Store a heartbeat via phone API key + // Store a heartbeat via phone API key (retry to allow async phone-API-key association) storePayload := map[string]interface{}{ "phone_numbers": []string{phone.PhoneNumber}, "charging": true, } - body, err := json.Marshal(storePayload) - require.NoError(t, err) url := apiBaseURL + "/v1/heartbeats" - req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) - require.NoError(t, err) - req.Header.Set("Content-Type", "application/json") - req.Header.Set("x-api-key", phone.PhoneAPIKey) - - resp, err := http.DefaultClient.Do(req) - require.NoError(t, err) - defer resp.Body.Close() - - respBody, err := io.ReadAll(resp.Body) - require.NoError(t, err) - require.Equal(t, http.StatusCreated, resp.StatusCode, "store heartbeat failed: %s", string(respBody)) + var respBody []byte + var statusCode int + deadline := time.Now().Add(15 * time.Second) + for time.Now().Before(deadline) { + body, err := json.Marshal(storePayload) + require.NoError(t, err) + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) + require.NoError(t, err) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("x-api-key", phone.PhoneAPIKey) + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + + respBody, err = io.ReadAll(resp.Body) + resp.Body.Close() + require.NoError(t, err) + + statusCode = resp.StatusCode + if statusCode == http.StatusCreated { + break + } + time.Sleep(500 * time.Millisecond) + } + require.Equal(t, http.StatusCreated, statusCode, "store heartbeat failed: %s", string(respBody)) // Read heartbeats back via user API key client := newAPIClient() From 5bd3afc5f70eea5fa617a176b290da97560d187a Mon Sep 17 00:00:00 2001 From: Acho Arnold Date: Sat, 16 May 2026 11:49:49 +0300 Subject: [PATCH 12/14] fix(api): handle rows.Err() and uuid.Parse errors in libsql repositories - Add rows.Err() check after iteration loop in Index to catch network errors or timeouts that silently end iteration - Propagate uuid.Parse errors in scanHeartbeat and scanHeartbeatRow instead of discarding them with _ - Propagate uuid.Parse errors in scanHeartbeatMonitorRow for both monitor ID and phone ID fields Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../libsql_heartbeat_monitor_repository.go | 10 ++++++++-- .../libsql_heartbeat_repository.go | 18 +++++++++++++++--- 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/api/pkg/repositories/libsql_heartbeat_monitor_repository.go b/api/pkg/repositories/libsql_heartbeat_monitor_repository.go index 3987b058..6c6913a7 100644 --- a/api/pkg/repositories/libsql_heartbeat_monitor_repository.go +++ b/api/pkg/repositories/libsql_heartbeat_monitor_repository.go @@ -185,8 +185,14 @@ func scanHeartbeatMonitorRow(row *sql.Row) (*entities.HeartbeatMonitor, error) { if err != nil { return nil, err } - monitor.ID, _ = uuid.Parse(id) - monitor.PhoneID, _ = uuid.Parse(phoneID) + monitor.ID, err = uuid.Parse(id) + if err != nil { + return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot parse heartbeat monitor ID [%s]", id)) + } + monitor.PhoneID, err = uuid.Parse(phoneID) + if err != nil { + return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot parse heartbeat monitor phone ID [%s]", phoneID)) + } monitor.UserID = entities.UserID(userID) monitor.PhoneOnline = phoneOnline != 0 return monitor, nil diff --git a/api/pkg/repositories/libsql_heartbeat_repository.go b/api/pkg/repositories/libsql_heartbeat_repository.go index 34f4398f..df3f843f 100644 --- a/api/pkg/repositories/libsql_heartbeat_repository.go +++ b/api/pkg/repositories/libsql_heartbeat_repository.go @@ -33,9 +33,11 @@ func NewLibsqlHeartbeatRepository( } func (repository *libsqlHeartbeatRepository) Store(ctx context.Context, heartbeat *entities.Heartbeat) error { - ctx, span := repository.tracer.Start(ctx) + ctx, span, ctxLogger := repository.tracer.StartWithLogger(ctx, repository.logger) defer span.End() + ctxLogger.Trace("saving new heartbeat") + ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) defer cancel() @@ -93,6 +95,10 @@ func (repository *libsqlHeartbeatRepository) Index(ctx context.Context, userID e } heartbeats = append(heartbeats, *heartbeat) } + if rowsErr := rows.Err(); rowsErr != nil { + msg := fmt.Sprintf("error iterating heartbeat rows for owner [%s]", owner) + return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(rowsErr, msg)) + } return &heartbeats, nil } @@ -147,7 +153,10 @@ func scanHeartbeat(rows *sql.Rows) (*entities.Heartbeat, error) { if err != nil { return nil, err } - heartbeat.ID, _ = uuid.Parse(id) + heartbeat.ID, err = uuid.Parse(id) + if err != nil { + return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot parse heartbeat ID [%s]", id)) + } heartbeat.Charging = charging != 0 heartbeat.UserID = entities.UserID(userID) return heartbeat, nil @@ -162,7 +171,10 @@ func scanHeartbeatRow(row *sql.Row) (*entities.Heartbeat, error) { if err != nil { return nil, err } - heartbeat.ID, _ = uuid.Parse(id) + heartbeat.ID, err = uuid.Parse(id) + if err != nil { + return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot parse heartbeat ID [%s]", id)) + } heartbeat.Charging = charging != 0 heartbeat.UserID = entities.UserID(userID) return heartbeat, nil From 8db9e03e570b4d5ff0e3a7392505cfd25327aa5c Mon Sep 17 00:00:00 2001 From: Acho Arnold Date: Sat, 16 May 2026 11:55:43 +0300 Subject: [PATCH 13/14] refactor(api): make scan functions methods on libsql repository structs Convert package-level functions scanHeartbeat, scanHeartbeatRow, and scanHeartbeatMonitorRow into methods on their respective repository structs for consistency with the repository pattern. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../repositories/libsql_heartbeat_monitor_repository.go | 4 ++-- api/pkg/repositories/libsql_heartbeat_repository.go | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/api/pkg/repositories/libsql_heartbeat_monitor_repository.go b/api/pkg/repositories/libsql_heartbeat_monitor_repository.go index 6c6913a7..0cb594af 100644 --- a/api/pkg/repositories/libsql_heartbeat_monitor_repository.go +++ b/api/pkg/repositories/libsql_heartbeat_monitor_repository.go @@ -71,7 +71,7 @@ func (repository *libsqlHeartbeatMonitorRepository) Load(ctx context.Context, us string(userID), phoneNumber, ) - monitor, err := scanHeartbeatMonitorRow(row) + monitor, err := repository.scanHeartbeatMonitorRow(row) if err == sql.ErrNoRows { msg := fmt.Sprintf("heartbeat monitor with userID [%s] and owner [%s] does not exist", userID, phoneNumber) return nil, repository.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, ErrCodeNotFound, msg)) @@ -177,7 +177,7 @@ func (repository *libsqlHeartbeatMonitorRepository) DeleteAllForUser(ctx context return nil } -func scanHeartbeatMonitorRow(row *sql.Row) (*entities.HeartbeatMonitor, error) { +func (repository *libsqlHeartbeatMonitorRepository) scanHeartbeatMonitorRow(row *sql.Row) (*entities.HeartbeatMonitor, error) { monitor := new(entities.HeartbeatMonitor) var id, phoneID, userID string var phoneOnline int diff --git a/api/pkg/repositories/libsql_heartbeat_repository.go b/api/pkg/repositories/libsql_heartbeat_repository.go index df3f843f..0cf74025 100644 --- a/api/pkg/repositories/libsql_heartbeat_repository.go +++ b/api/pkg/repositories/libsql_heartbeat_repository.go @@ -88,7 +88,7 @@ func (repository *libsqlHeartbeatRepository) Index(ctx context.Context, userID e heartbeats := make([]entities.Heartbeat, 0) for rows.Next() { - heartbeat, scanErr := scanHeartbeat(rows) + heartbeat, scanErr := repository.scanHeartbeat(rows) if scanErr != nil { msg := fmt.Sprintf("cannot scan heartbeat row for owner [%s]", owner) return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(scanErr, msg)) @@ -115,7 +115,7 @@ func (repository *libsqlHeartbeatRepository) Last(ctx context.Context, userID en string(userID), owner, ) - heartbeat, err := scanHeartbeatRow(row) + heartbeat, err := repository.scanHeartbeatRow(row) if err == sql.ErrNoRows { msg := fmt.Sprintf("heartbeat with userID [%s] and owner [%s] does not exist", userID, owner) return nil, repository.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, ErrCodeNotFound, msg)) @@ -144,7 +144,7 @@ func (repository *libsqlHeartbeatRepository) DeleteAllForUser(ctx context.Contex return nil } -func scanHeartbeat(rows *sql.Rows) (*entities.Heartbeat, error) { +func (repository *libsqlHeartbeatRepository) scanHeartbeat(rows *sql.Rows) (*entities.Heartbeat, error) { heartbeat := new(entities.Heartbeat) var id string var charging int @@ -162,7 +162,7 @@ func scanHeartbeat(rows *sql.Rows) (*entities.Heartbeat, error) { return heartbeat, nil } -func scanHeartbeatRow(row *sql.Row) (*entities.Heartbeat, error) { +func (repository *libsqlHeartbeatRepository) scanHeartbeatRow(row *sql.Row) (*entities.Heartbeat, error) { heartbeat := new(entities.Heartbeat) var id string var charging int From 0097a60ba40c3bcb72f3b9cf924fe6040c365f70 Mon Sep 17 00:00:00 2001 From: Acho Arnold Date: Sat, 16 May 2026 11:59:18 +0300 Subject: [PATCH 14/14] Remove uneeded log --- api/pkg/repositories/libsql_heartbeat_repository.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/api/pkg/repositories/libsql_heartbeat_repository.go b/api/pkg/repositories/libsql_heartbeat_repository.go index 0cf74025..42fdf911 100644 --- a/api/pkg/repositories/libsql_heartbeat_repository.go +++ b/api/pkg/repositories/libsql_heartbeat_repository.go @@ -33,11 +33,9 @@ func NewLibsqlHeartbeatRepository( } func (repository *libsqlHeartbeatRepository) Store(ctx context.Context, heartbeat *entities.Heartbeat) error { - ctx, span, ctxLogger := repository.tracer.StartWithLogger(ctx, repository.logger) + ctx, span, _ := repository.tracer.StartWithLogger(ctx, repository.logger) defer span.End() - ctxLogger.Trace("saving new heartbeat") - ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) defer cancel()