From f944650b3a9e79ea5f6c68bab282a15f25926e2a Mon Sep 17 00:00:00 2001
From: Acho Arnold
Date: Sat, 16 May 2026 13:07:44 +0300
Subject: [PATCH 01/10] feat: replace libSQL/Turso with MongoDB Atlas for
heartbeat storage
- Add MongoDB Go driver v2 repository implementations for HeartbeatRepository
and HeartbeatMonitorRepository interfaces
- Create mongodb.go connection helper with Atlas support and index creation
- Update DI container to wire MongoDB as the hedging secondary backend
- Replace 'turso' case with 'mongodb' case for standalone MongoDB usage
- Update integration test docker-compose to use mongo:7 instead of sqld
- Update .env.test with MongoDB connection string
HEARTBEAT_DB_BACKEND options:
- 'hedging': PostgreSQL primary, MongoDB secondary (fail-open writes)
- 'mongodb': MongoDB only
- default: PostgreSQL only (GORM)
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
---
api/go.mod | 5 +
api/go.sum | 10 +
api/pkg/di/container.go | 39 ++-
.../mongo_heartbeat_monitor_repository.go | 230 ++++++++++++++++++
.../mongo_heartbeat_repository.go | 181 ++++++++++++++
api/pkg/repositories/mongodb.go | 68 ++++++
tests/.env.test | 2 +-
tests/docker-compose.yml | 19 +-
web/pages/settings/index.vue | 16 +-
9 files changed, 550 insertions(+), 20 deletions(-)
create mode 100644 api/pkg/repositories/mongo_heartbeat_monitor_repository.go
create mode 100644 api/pkg/repositories/mongo_heartbeat_repository.go
create mode 100644 api/pkg/repositories/mongodb.go
diff --git a/api/go.mod b/api/go.mod
index d77c0bf6..24b70d57 100644
--- a/api/go.mod
+++ b/api/go.mod
@@ -47,6 +47,7 @@ require (
github.com/tursodatabase/libsql-client-go v0.0.0-20260514053736-a9a8fadfe885
github.com/uptrace/uptrace-go v1.43.0
github.com/xuri/excelize/v2 v2.10.1
+ go.mongodb.org/mongo-driver/v2 v2.6.0
go.opentelemetry.io/otel v1.43.0
go.opentelemetry.io/otel/metric v1.43.0
go.opentelemetry.io/otel/sdk v1.43.0
@@ -164,8 +165,12 @@ require (
github.com/valyala/fasthttp v1.71.0 // indirect
github.com/vanng822/css v1.0.1 // indirect
github.com/vanng822/go-premailer v1.33.0 // indirect
+ github.com/xdg-go/pbkdf2 v1.0.0 // indirect
+ github.com/xdg-go/scram v1.2.0 // indirect
+ github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/xuri/efp v0.0.1 // indirect
github.com/xuri/nfp v0.0.2-0.20250530014748-2ddeb826f9a9 // indirect
+ github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/contrib v1.43.0 // indirect
go.opentelemetry.io/contrib/detectors/gcp v1.43.0 // indirect
diff --git a/api/go.sum b/api/go.sum
index 0710b094..d5fa9e1d 100644
--- a/api/go.sum
+++ b/api/go.sum
@@ -336,6 +336,12 @@ github.com/vanng822/css v1.0.1 h1:10yiXc4e8NI8ldU6mSrWmSWMuyWgPr9DZ63RSlsgDw8=
github.com/vanng822/css v1.0.1/go.mod h1:tcnB1voG49QhCrwq1W0w5hhGasvOg+VQp9i9H1rCM1w=
github.com/vanng822/go-premailer v1.33.0 h1:nglIpKn/7e3kIAwYByiH5xpauFur7RwAucqyZ59hcic=
github.com/vanng822/go-premailer v1.33.0/go.mod h1:LGYI7ym6FQ7KcHN16LiQRF+tlan7qwhP1KEhpTINFpo=
+github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
+github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
+github.com/xdg-go/scram v1.2.0 h1:bYKF2AEwG5rqd1BumT4gAnvwU/M9nBp2pTSxeZw7Wvs=
+github.com/xdg-go/scram v1.2.0/go.mod h1:3dlrS0iBaWKYVt2ZfA4cj48umJZ+cAEbR6/SjLA88I8=
+github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8=
+github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
github.com/xuri/efp v0.0.1 h1:fws5Rv3myXyYni8uwj2qKjVaRP30PdjeYe2Y6FDsCL8=
github.com/xuri/efp v0.0.1/go.mod h1:ybY/Jr0T0GTCnYjKqmdwxyxn2BQf2RcQIIvex5QldPI=
github.com/xuri/excelize/v2 v2.10.1 h1:V62UlqopMqha3kOpnlHy2CcRVw1V8E63jFoWUmMzxN0=
@@ -344,11 +350,15 @@ github.com/xuri/nfp v0.0.2-0.20250530014748-2ddeb826f9a9 h1:+C0TIdyyYmzadGaL/HBL
github.com/xuri/nfp v0.0.2-0.20250530014748-2ddeb826f9a9/go.mod h1:WwHg+CVyzlv/TX9xqBFXEZAuxOPxn2k1GNHwG41IIUQ=
github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU=
github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E=
+github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM=
+github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/yuin/goldmark v1.8.2 h1:kEGpgqJXdgbkhcOgBxkC0X0PmoPG1ZyoZ117rDVp4zE=
github.com/yuin/goldmark v1.8.2/go.mod h1:ip/1k0VRfGynBgxOz0yCqHrbZXhcjxyuS66Brc7iBKg=
github.com/zeebo/xxh3 v1.1.0 h1:s7DLGDK45Dyfg7++yxI0khrfwq9661w9EN78eP/UZVs=
github.com/zeebo/xxh3 v1.1.0/go.mod h1:IisAie1LELR4xhVinxWS5+zf1lA4p0MW4T+w+W07F5s=
+go.mongodb.org/mongo-driver/v2 v2.6.0 h1:b9sJOYrkmt4l8bY43ZenFBcPlhYIjaOfYHLtbB/5qi8=
+go.mongodb.org/mongo-driver/v2 v2.6.0/go.mod h1:yOI9kBsufol30iFsl1slpdq1I0eHPzybRWdyYUs8K/0=
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
go.opentelemetry.io/contrib v1.43.0 h1:rv+pngknCr4qpZDxSpEvEoRioutgfbkk82x6MChJQ3U=
diff --git a/api/pkg/di/container.go b/api/pkg/di/container.go
index ce6a82e2..94c2722e 100644
--- a/api/pkg/di/container.go
+++ b/api/pkg/di/container.go
@@ -74,6 +74,7 @@ import (
"github.com/NdoleStudio/httpsms/pkg/handlers"
"github.com/NdoleStudio/httpsms/pkg/telemetry"
"github.com/NdoleStudio/httpsms/pkg/validators"
+ mongoDriver "go.mongodb.org/mongo-driver/v2/mongo"
"gorm.io/driver/postgres"
gormLogger "gorm.io/gorm/logger"
)
@@ -84,6 +85,7 @@ type Container struct {
db *gorm.DB
dedicatedDB *gorm.DB
tursoDB *sql.DB
+ mongoClient *mongoDriver.Client
version string
app *fiber.App
eventDispatcher *services.EventDispatcher
@@ -310,6 +312,23 @@ func (container *Container) TursoDB() *sql.DB {
return container.tursoDB
}
+// MongoDB creates a *mongo.Client connection to MongoDB Atlas
+func (container *Container) MongoDB() *mongoDriver.Client {
+ if container.mongoClient != nil {
+ return container.mongoClient
+ }
+
+ container.logger.Debug("creating MongoDB *mongo.Client connection")
+
+ client, err := repositories.NewMongoDB(os.Getenv("MONGODB_URI"))
+ if err != nil {
+ container.logger.Fatal(err)
+ }
+
+ container.mongoClient = client
+ return container.mongoClient
+}
+
// HedgingFailureCounter creates an OTel counter for hedging secondary write failures
func (container *Container) HedgingFailureCounter() otelMetric.Int64Counter {
meter := otel.GetMeterProvider().Meter(
@@ -922,12 +941,12 @@ func (container *Container) MessageThreadRepository() (repository repositories.M
// HeartbeatMonitorRepository creates a new instance of repositories.HeartbeatMonitorRepository
func (container *Container) HeartbeatMonitorRepository() (repository repositories.HeartbeatMonitorRepository) {
switch os.Getenv("HEARTBEAT_DB_BACKEND") {
- case "turso":
- container.logger.Debug("creating libSQL repositories.HeartbeatMonitorRepository")
- return repositories.NewLibsqlHeartbeatMonitorRepository(
+ case "mongodb":
+ container.logger.Debug("creating MongoDB repositories.HeartbeatMonitorRepository")
+ return repositories.NewMongoHeartbeatMonitorRepository(
container.Logger(),
container.Tracer(),
- container.TursoDB(),
+ container.MongoDB(),
)
case "hedging":
container.logger.Debug("creating hedging repositories.HeartbeatMonitorRepository")
@@ -935,7 +954,7 @@ func (container *Container) HeartbeatMonitorRepository() (repository repositorie
container.Logger(),
container.Tracer(),
repositories.NewGormHeartbeatMonitorRepository(container.Logger(), container.Tracer(), container.DedicatedDB()),
- repositories.NewLibsqlHeartbeatMonitorRepository(container.Logger(), container.Tracer(), container.TursoDB()),
+ repositories.NewMongoHeartbeatMonitorRepository(container.Logger(), container.Tracer(), container.MongoDB()),
container.HedgingFailureCounter(),
)
default:
@@ -1760,12 +1779,12 @@ func (container *Container) RegisterSwaggerRoutes() {
// HeartbeatRepository registers a new instance of repositories.HeartbeatRepository
func (container *Container) HeartbeatRepository() repositories.HeartbeatRepository {
switch os.Getenv("HEARTBEAT_DB_BACKEND") {
- case "turso":
- container.logger.Debug("creating libSQL repositories.HeartbeatRepository")
- return repositories.NewLibsqlHeartbeatRepository(
+ case "mongodb":
+ container.logger.Debug("creating MongoDB repositories.HeartbeatRepository")
+ return repositories.NewMongoHeartbeatRepository(
container.Logger(),
container.Tracer(),
- container.TursoDB(),
+ container.MongoDB(),
)
case "hedging":
container.logger.Debug("creating hedging repositories.HeartbeatRepository")
@@ -1773,7 +1792,7 @@ func (container *Container) HeartbeatRepository() repositories.HeartbeatReposito
container.Logger(),
container.Tracer(),
repositories.NewGormHeartbeatRepository(container.Logger(), container.Tracer(), container.DedicatedDB()),
- repositories.NewLibsqlHeartbeatRepository(container.Logger(), container.Tracer(), container.TursoDB()),
+ repositories.NewMongoHeartbeatRepository(container.Logger(), container.Tracer(), container.MongoDB()),
container.HedgingFailureCounter(),
)
default:
diff --git a/api/pkg/repositories/mongo_heartbeat_monitor_repository.go b/api/pkg/repositories/mongo_heartbeat_monitor_repository.go
new file mode 100644
index 00000000..1f5f014f
--- /dev/null
+++ b/api/pkg/repositories/mongo_heartbeat_monitor_repository.go
@@ -0,0 +1,230 @@
+package repositories
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/google/uuid"
+ "go.mongodb.org/mongo-driver/v2/bson"
+ "go.mongodb.org/mongo-driver/v2/mongo"
+
+ "github.com/NdoleStudio/httpsms/pkg/entities"
+ "github.com/NdoleStudio/httpsms/pkg/telemetry"
+ "github.com/palantir/stacktrace"
+)
+
+// mongoHeartbeatMonitorRepository is responsible for persisting entities.HeartbeatMonitor in MongoDB
+type mongoHeartbeatMonitorRepository struct {
+ logger telemetry.Logger
+ tracer telemetry.Tracer
+ collection *mongo.Collection
+}
+
+// NewMongoHeartbeatMonitorRepository creates the MongoDB version of the HeartbeatMonitorRepository
+func NewMongoHeartbeatMonitorRepository(
+ logger telemetry.Logger,
+ tracer telemetry.Tracer,
+ client *mongo.Client,
+) HeartbeatMonitorRepository {
+ return &mongoHeartbeatMonitorRepository{
+ logger: logger.WithService(fmt.Sprintf("%T", &mongoHeartbeatMonitorRepository{})),
+ tracer: tracer,
+ collection: client.Database(mongoDBName).Collection(collectionHeartbeatMonitors),
+ }
+}
+
+type heartbeatMonitorDocument struct {
+ ID string `bson:"_id"`
+ PhoneID string `bson:"phone_id"`
+ UserID string `bson:"user_id"`
+ QueueID string `bson:"queue_id"`
+ Owner string `bson:"owner"`
+ PhoneOnline bool `bson:"phone_online"`
+ CreatedAt time.Time `bson:"created_at"`
+ UpdatedAt time.Time `bson:"updated_at"`
+}
+
+func (repository *mongoHeartbeatMonitorRepository) Store(ctx context.Context, monitor *entities.HeartbeatMonitor) error {
+ ctx, span := repository.tracer.Start(ctx)
+ defer span.End()
+
+ ctx, cancel := context.WithTimeout(ctx, dbOperationDuration)
+ defer cancel()
+
+ doc := heartbeatMonitorDocument{
+ ID: monitor.ID.String(),
+ PhoneID: monitor.PhoneID.String(),
+ UserID: string(monitor.UserID),
+ QueueID: monitor.QueueID,
+ Owner: monitor.Owner,
+ PhoneOnline: monitor.PhoneOnline,
+ CreatedAt: monitor.CreatedAt.UTC(),
+ UpdatedAt: monitor.UpdatedAt.UTC(),
+ }
+
+ _, err := repository.collection.InsertOne(ctx, doc)
+ if err != nil {
+ msg := fmt.Sprintf("cannot save heartbeat monitor with ID [%s]", monitor.ID)
+ return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
+ }
+
+ return nil
+}
+
+func (repository *mongoHeartbeatMonitorRepository) Load(ctx context.Context, userID entities.UserID, phoneNumber string) (*entities.HeartbeatMonitor, error) {
+ ctx, span := repository.tracer.Start(ctx)
+ defer span.End()
+
+ ctx, cancel := context.WithTimeout(ctx, dbOperationDuration)
+ defer cancel()
+
+ filter := bson.D{
+ {"user_id", string(userID)},
+ {"owner", phoneNumber},
+ }
+
+ var doc heartbeatMonitorDocument
+ err := repository.collection.FindOne(ctx, filter).Decode(&doc)
+ if err == mongo.ErrNoDocuments {
+ msg := fmt.Sprintf("heartbeat monitor with userID [%s] and owner [%s] does not exist", userID, phoneNumber)
+ return nil, repository.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, ErrCodeNotFound, msg))
+ }
+ if err != nil {
+ msg := fmt.Sprintf("cannot load heartbeat monitor with userID [%s] and owner [%s]", userID, phoneNumber)
+ return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
+ }
+
+ monitor, err := docToHeartbeatMonitor(doc)
+ if err != nil {
+ return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, "cannot convert heartbeat monitor document"))
+ }
+
+ return monitor, nil
+}
+
+func (repository *mongoHeartbeatMonitorRepository) Exists(ctx context.Context, userID entities.UserID, monitorID uuid.UUID) (bool, error) {
+ ctx, span := repository.tracer.Start(ctx)
+ defer span.End()
+
+ ctx, cancel := context.WithTimeout(ctx, dbOperationDuration)
+ defer cancel()
+
+ filter := bson.D{
+ {"user_id", string(userID)},
+ {"_id", monitorID.String()},
+ }
+
+ count, err := repository.collection.CountDocuments(ctx, filter)
+ if err != nil {
+ msg := fmt.Sprintf("cannot check if heartbeat monitor exists with userID [%s] and monitor ID [%s]", userID, monitorID)
+ return false, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
+ }
+
+ return count > 0, nil
+}
+
+func (repository *mongoHeartbeatMonitorRepository) UpdateQueueID(ctx context.Context, monitorID uuid.UUID, queueID string) error {
+ ctx, span := repository.tracer.Start(ctx)
+ defer span.End()
+
+ ctx, cancel := context.WithTimeout(ctx, dbOperationDuration)
+ defer cancel()
+
+ filter := bson.D{{"_id", monitorID.String()}}
+ update := bson.D{{"$set", bson.D{
+ {"queue_id", queueID},
+ {"updated_at", time.Now().UTC()},
+ }}}
+
+ _, err := repository.collection.UpdateOne(ctx, filter, update)
+ if err != nil {
+ msg := fmt.Sprintf("cannot update heartbeat monitor ID [%s]", monitorID)
+ return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
+ }
+
+ return nil
+}
+
+func (repository *mongoHeartbeatMonitorRepository) Delete(ctx context.Context, userID entities.UserID, phoneNumber string) error {
+ ctx, span := repository.tracer.Start(ctx)
+ defer span.End()
+
+ ctx, cancel := context.WithTimeout(ctx, dbOperationDuration)
+ defer cancel()
+
+ filter := bson.D{
+ {"user_id", string(userID)},
+ {"owner", phoneNumber},
+ }
+
+ _, err := repository.collection.DeleteMany(ctx, filter)
+ if err != nil {
+ msg := fmt.Sprintf("cannot delete heartbeat monitor with owner [%s] and userID [%s]", phoneNumber, userID)
+ return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
+ }
+
+ return nil
+}
+
+func (repository *mongoHeartbeatMonitorRepository) UpdatePhoneOnline(ctx context.Context, userID entities.UserID, monitorID uuid.UUID, online bool) error {
+ ctx, span := repository.tracer.Start(ctx)
+ defer span.End()
+
+ ctx, cancel := context.WithTimeout(ctx, dbOperationDuration)
+ defer cancel()
+
+ filter := bson.D{
+ {"_id", monitorID.String()},
+ {"user_id", string(userID)},
+ }
+ update := bson.D{{"$set", bson.D{
+ {"phone_online", online},
+ {"updated_at", time.Now().UTC()},
+ }}}
+
+ _, err := repository.collection.UpdateOne(ctx, filter, update)
+ if err != nil {
+ msg := fmt.Sprintf("cannot update heartbeat monitor ID [%s] for user [%s]", monitorID, userID)
+ return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
+ }
+
+ return nil
+}
+
+func (repository *mongoHeartbeatMonitorRepository) DeleteAllForUser(ctx context.Context, userID entities.UserID) error {
+ ctx, span := repository.tracer.Start(ctx)
+ defer span.End()
+
+ ctx, cancel := context.WithTimeout(ctx, dbOperationDuration)
+ defer cancel()
+
+ _, err := repository.collection.DeleteMany(ctx, bson.D{{"user_id", string(userID)}})
+ if err != nil {
+ msg := fmt.Sprintf("cannot delete all [%T] for user with ID [%s]", &entities.HeartbeatMonitor{}, userID)
+ return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
+ }
+
+ return nil
+}
+
+func docToHeartbeatMonitor(doc heartbeatMonitorDocument) (*entities.HeartbeatMonitor, error) {
+ id, err := uuid.Parse(doc.ID)
+ if err != nil {
+ return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot parse heartbeat monitor ID [%s]", doc.ID))
+ }
+ phoneID, err := uuid.Parse(doc.PhoneID)
+ if err != nil {
+ return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot parse heartbeat monitor phone ID [%s]", doc.PhoneID))
+ }
+ return &entities.HeartbeatMonitor{
+ ID: id,
+ PhoneID: phoneID,
+ UserID: entities.UserID(doc.UserID),
+ QueueID: doc.QueueID,
+ Owner: doc.Owner,
+ PhoneOnline: doc.PhoneOnline,
+ CreatedAt: doc.CreatedAt,
+ UpdatedAt: doc.UpdatedAt,
+ }, nil
+}
diff --git a/api/pkg/repositories/mongo_heartbeat_repository.go b/api/pkg/repositories/mongo_heartbeat_repository.go
new file mode 100644
index 00000000..13802e06
--- /dev/null
+++ b/api/pkg/repositories/mongo_heartbeat_repository.go
@@ -0,0 +1,181 @@
+package repositories
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/google/uuid"
+ "go.mongodb.org/mongo-driver/v2/bson"
+ "go.mongodb.org/mongo-driver/v2/mongo"
+ "go.mongodb.org/mongo-driver/v2/mongo/options"
+
+ "github.com/NdoleStudio/httpsms/pkg/entities"
+ "github.com/NdoleStudio/httpsms/pkg/telemetry"
+ "github.com/palantir/stacktrace"
+)
+
+// mongoHeartbeatRepository is responsible for persisting entities.Heartbeat in MongoDB
+type mongoHeartbeatRepository struct {
+ logger telemetry.Logger
+ tracer telemetry.Tracer
+ collection *mongo.Collection
+}
+
+// NewMongoHeartbeatRepository creates the MongoDB version of the HeartbeatRepository
+func NewMongoHeartbeatRepository(
+ logger telemetry.Logger,
+ tracer telemetry.Tracer,
+ client *mongo.Client,
+) HeartbeatRepository {
+ return &mongoHeartbeatRepository{
+ logger: logger.WithService(fmt.Sprintf("%T", &mongoHeartbeatRepository{})),
+ tracer: tracer,
+ collection: client.Database(mongoDBName).Collection(collectionHeartbeats),
+ }
+}
+
+type heartbeatDocument struct {
+ ID string `bson:"_id"`
+ Owner string `bson:"owner"`
+ Version string `bson:"version"`
+ Charging bool `bson:"charging"`
+ UserID string `bson:"user_id"`
+ Timestamp time.Time `bson:"timestamp"`
+}
+
+func (repository *mongoHeartbeatRepository) Store(ctx context.Context, heartbeat *entities.Heartbeat) error {
+ ctx, span, _ := repository.tracer.StartWithLogger(ctx, repository.logger)
+ defer span.End()
+
+ ctx, cancel := context.WithTimeout(ctx, dbOperationDuration)
+ defer cancel()
+
+ doc := heartbeatDocument{
+ ID: heartbeat.ID.String(),
+ Owner: heartbeat.Owner,
+ Version: heartbeat.Version,
+ Charging: heartbeat.Charging,
+ UserID: string(heartbeat.UserID),
+ Timestamp: heartbeat.Timestamp.UTC(),
+ }
+
+ _, err := repository.collection.InsertOne(ctx, doc)
+ if err != nil {
+ msg := fmt.Sprintf("cannot save heartbeat with ID [%s]", heartbeat.ID)
+ return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
+ }
+
+ return nil
+}
+
+func (repository *mongoHeartbeatRepository) Index(ctx context.Context, userID entities.UserID, owner string, params IndexParams) (*[]entities.Heartbeat, error) {
+ ctx, span := repository.tracer.Start(ctx)
+ defer span.End()
+
+ ctx, cancel := context.WithTimeout(ctx, dbOperationDuration)
+ defer cancel()
+
+ filter := bson.D{
+ {"user_id", string(userID)},
+ {"owner", owner},
+ }
+
+ if len(params.Query) > 0 {
+ filter = append(filter, bson.E{"version", bson.D{{"$regex", params.Query}, {"$options", "i"}}})
+ }
+
+ opts := options.Find().
+ SetSort(bson.D{{"timestamp", -1}}).
+ SetSkip(int64(params.Skip)).
+ SetLimit(int64(params.Limit))
+
+ cursor, err := repository.collection.Find(ctx, filter, opts)
+ if err != nil {
+ msg := fmt.Sprintf("cannot fetch heartbeats with owner [%s] and params [%+#v]", owner, params)
+ return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
+ }
+ defer cursor.Close(ctx)
+
+ var docs []heartbeatDocument
+ if err = cursor.All(ctx, &docs); err != nil {
+ msg := fmt.Sprintf("cannot decode heartbeats for owner [%s]", owner)
+ return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
+ }
+
+ heartbeats := make([]entities.Heartbeat, 0, len(docs))
+ for _, doc := range docs {
+ hb, convertErr := docToHeartbeat(doc)
+ if convertErr != nil {
+ msg := fmt.Sprintf("cannot convert heartbeat document for owner [%s]", owner)
+ return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(convertErr, msg))
+ }
+ heartbeats = append(heartbeats, *hb)
+ }
+
+ return &heartbeats, nil
+}
+
+func (repository *mongoHeartbeatRepository) Last(ctx context.Context, userID entities.UserID, owner string) (*entities.Heartbeat, error) {
+ ctx, span := repository.tracer.Start(ctx)
+ defer span.End()
+
+ ctx, cancel := context.WithTimeout(ctx, dbOperationDuration)
+ defer cancel()
+
+ filter := bson.D{
+ {"user_id", string(userID)},
+ {"owner", owner},
+ }
+
+ opts := options.FindOne().SetSort(bson.D{{"timestamp", -1}})
+
+ var doc heartbeatDocument
+ err := repository.collection.FindOne(ctx, filter, opts).Decode(&doc)
+ if err == mongo.ErrNoDocuments {
+ msg := fmt.Sprintf("heartbeat with userID [%s] and owner [%s] does not exist", userID, owner)
+ return nil, repository.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, ErrCodeNotFound, msg))
+ }
+ if err != nil {
+ msg := fmt.Sprintf("cannot load heartbeat with userID [%s] and owner [%s]", userID, owner)
+ return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
+ }
+
+ heartbeat, err := docToHeartbeat(doc)
+ if err != nil {
+ return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, "cannot convert heartbeat document"))
+ }
+
+ return heartbeat, nil
+}
+
+func (repository *mongoHeartbeatRepository) DeleteAllForUser(ctx context.Context, userID entities.UserID) error {
+ ctx, span := repository.tracer.Start(ctx)
+ defer span.End()
+
+ ctx, cancel := context.WithTimeout(ctx, dbOperationDuration)
+ defer cancel()
+
+ _, err := repository.collection.DeleteMany(ctx, bson.D{{"user_id", string(userID)}})
+ if err != nil {
+ msg := fmt.Sprintf("cannot delete all [%T] for user with ID [%s]", &entities.Heartbeat{}, userID)
+ return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
+ }
+
+ return nil
+}
+
+func docToHeartbeat(doc heartbeatDocument) (*entities.Heartbeat, error) {
+ id, err := uuid.Parse(doc.ID)
+ if err != nil {
+ return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot parse heartbeat ID [%s]", doc.ID))
+ }
+ return &entities.Heartbeat{
+ ID: id,
+ Owner: doc.Owner,
+ Version: doc.Version,
+ Charging: doc.Charging,
+ UserID: entities.UserID(doc.UserID),
+ Timestamp: doc.Timestamp,
+ }, nil
+}
diff --git a/api/pkg/repositories/mongodb.go b/api/pkg/repositories/mongodb.go
new file mode 100644
index 00000000..80328bed
--- /dev/null
+++ b/api/pkg/repositories/mongodb.go
@@ -0,0 +1,68 @@
+package repositories
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "go.mongodb.org/mongo-driver/v2/bson"
+ "go.mongodb.org/mongo-driver/v2/mongo"
+ "go.mongodb.org/mongo-driver/v2/mongo/options"
+
+ "github.com/palantir/stacktrace"
+)
+
+const (
+ mongoDBName = "httpsms"
+ collectionHeartbeats = "heartbeats"
+ collectionHeartbeatMonitors = "heartbeat_monitors"
+)
+
+// NewMongoDB creates a new *mongo.Client connection to MongoDB Atlas and ensures indexes
+func NewMongoDB(uri string) (*mongo.Client, error) {
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+
+ serverAPI := options.ServerAPI(options.ServerAPIVersion1)
+ opts := options.Client().ApplyURI(uri).SetServerAPIOptions(serverAPI)
+
+ client, err := mongo.Connect(opts)
+ if err != nil {
+ return nil, stacktrace.Propagate(err, "cannot connect to MongoDB Atlas")
+ }
+
+ if err = client.Ping(ctx, nil); err != nil {
+ return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot ping MongoDB with URI [%s]", uri))
+ }
+
+ if err = createMongoIndexes(ctx, client); err != nil {
+ return nil, stacktrace.Propagate(err, "cannot create MongoDB indexes")
+ }
+
+ return client, nil
+}
+
+func createMongoIndexes(ctx context.Context, client *mongo.Client) error {
+ db := client.Database(mongoDBName)
+
+ // Heartbeats indexes
+ heartbeatsCol := db.Collection(collectionHeartbeats)
+ _, err := heartbeatsCol.Indexes().CreateMany(ctx, []mongo.IndexModel{
+ {Keys: bson.D{{"owner", 1}, {"timestamp", -1}}},
+ {Keys: bson.D{{"user_id", 1}}},
+ })
+ if err != nil {
+ return stacktrace.Propagate(err, "cannot create indexes on heartbeats collection")
+ }
+
+ // Heartbeat monitors indexes
+ monitorsCol := db.Collection(collectionHeartbeatMonitors)
+ _, err = monitorsCol.Indexes().CreateMany(ctx, []mongo.IndexModel{
+ {Keys: bson.D{{"user_id", 1}, {"owner", 1}}},
+ })
+ if err != nil {
+ return stacktrace.Propagate(err, "cannot create indexes on heartbeat_monitors collection")
+ }
+
+ return nil
+}
diff --git a/tests/.env.test b/tests/.env.test
index 909902ae..3cec6a66 100644
--- a/tests/.env.test
+++ b/tests/.env.test
@@ -29,4 +29,4 @@ GCS_BUCKET_NAME=
UPTRACE_DSN=
CLOUDFLARE_TURNSTILE_SECRET_KEY=
HEARTBEAT_DB_BACKEND=hedging
-TURSO_DATABASE_DSN=http://sqld:8080
+MONGODB_URI=mongodb://httpsms:testpassword@mongodb:27017/?authSource=admin
diff --git a/tests/docker-compose.yml b/tests/docker-compose.yml
index 1e111b78..515b6298 100644
--- a/tests/docker-compose.yml
+++ b/tests/docker-compose.yml
@@ -25,10 +25,19 @@ services:
timeout: 5s
retries: 10
- sqld:
- image: ghcr.io/tursodatabase/libsql-server:latest
+ mongodb:
+ image: mongo:7
ports:
- - "8090:8080"
+ - "27017:27017"
+ environment:
+ MONGO_INITDB_ROOT_USERNAME: httpsms
+ MONGO_INITDB_ROOT_PASSWORD: testpassword
+ healthcheck:
+ test: ["CMD", "mongosh", "--eval", "db.adminCommand('ping')"]
+ interval: 5s
+ timeout: 5s
+ retries: 10
+ start_period: 5s
wiremock:
image: wiremock/wiremock:3x
@@ -58,8 +67,8 @@ services:
condition: service_healthy
wiremock:
condition: service_healthy
- sqld:
- condition: service_started
+ mongodb:
+ condition: service_healthy
env_file:
- .env.test
environment:
diff --git a/web/pages/settings/index.vue b/web/pages/settings/index.vue
index ebe95562..877565ec 100644
--- a/web/pages/settings/index.vue
+++ b/web/pages/settings/index.vue
@@ -435,10 +435,18 @@
-
- {{ mdiCalendarClock }}
- Create Send Schedule
-
+
+
+ {{ mdiCalendarClock }}
+ Create Send Schedule
+
+ Documentation
+
Email Notifications
From 440faa5b00ccbd21c2b416f576e9b828da0f76ee Mon Sep 17 00:00:00 2001
From: Acho Arnold
Date: Sat, 16 May 2026 13:10:28 +0300
Subject: [PATCH 02/10] chore: remove unused libSQL/Turso files and
dependencies
- Delete libsql.go, libsql_heartbeat_repository.go, libsql_heartbeat_monitor_repository.go
- Remove TursoDB() method and tursoDB field from DI container
- Remove unused database/sql import from container
- Run go mod tidy to remove libsql-client-go dependency
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
---
api/go.mod | 4 -
api/go.sum | 8 -
api/pkg/di/container.go | 19 --
api/pkg/repositories/libsql.go | 67 ------
.../libsql_heartbeat_monitor_repository.go | 199 ------------------
.../libsql_heartbeat_repository.go | 186 ----------------
6 files changed, 483 deletions(-)
delete mode 100644 api/pkg/repositories/libsql.go
delete mode 100644 api/pkg/repositories/libsql_heartbeat_monitor_repository.go
delete mode 100644 api/pkg/repositories/libsql_heartbeat_repository.go
diff --git a/api/go.mod b/api/go.mod
index 24b70d57..de1338cd 100644
--- a/api/go.mod
+++ b/api/go.mod
@@ -44,7 +44,6 @@ require (
github.com/stretchr/testify v1.11.1
github.com/swaggo/swag v1.16.6
github.com/thedevsaddam/govalidator v1.9.10
- github.com/tursodatabase/libsql-client-go v0.0.0-20260514053736-a9a8fadfe885
github.com/uptrace/uptrace-go v1.43.0
github.com/xuri/excelize/v2 v2.10.1
go.mongodb.org/mongo-driver/v2 v2.6.0
@@ -95,13 +94,11 @@ require (
github.com/PuerkitoBio/goquery v1.12.0 // indirect
github.com/andybalholm/brotli v1.2.1 // indirect
github.com/andybalholm/cascadia v1.3.3 // indirect
- github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
github.com/cenkalti/backoff/v5 v5.0.3 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/clipperhouse/displaywidth v0.11.0 // indirect
github.com/clipperhouse/uax29/v2 v2.7.0 // indirect
github.com/cncf/xds/go v0.0.0-20260202195803-dba9d589def2 // indirect
- github.com/coder/websocket v1.8.12 // indirect
github.com/envoyproxy/go-control-plane/envoy v1.37.0 // indirect
github.com/envoyproxy/protoc-gen-validate v1.3.3 // indirect
github.com/fatih/color v1.19.0 // indirect
@@ -191,7 +188,6 @@ require (
go.uber.org/zap v1.28.0 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/crypto v0.50.0 // indirect
- golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 // indirect
golang.org/x/mod v0.35.0 // indirect
golang.org/x/net v0.53.0 // indirect
golang.org/x/oauth2 v0.36.0 // indirect
diff --git a/api/go.sum b/api/go.sum
index d5fa9e1d..be12cc63 100644
--- a/api/go.sum
+++ b/api/go.sum
@@ -66,8 +66,6 @@ github.com/andybalholm/brotli v1.2.1 h1:R+f5xP285VArJDRgowrfb9DqL18yVK0gKAW/F+eT
github.com/andybalholm/brotli v1.2.1/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY=
github.com/andybalholm/cascadia v1.3.3 h1:AG2YHrzJIm4BZ19iwJ/DAua6Btl3IwJX+VI4kktS1LM=
github.com/andybalholm/cascadia v1.3.3/go.mod h1:xNd9bqTn98Ln4DwST8/nG+H0yuB8Hmgu1YHNnWw0GeA=
-github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8TVTI=
-github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g=
github.com/avast/retry-go/v5 v5.0.0 h1:kf1Qc2UsTZ4qq8elDymqfbISvkyMuhgRxuJqX2NHP7k=
github.com/avast/retry-go/v5 v5.0.0/go.mod h1://d+usmKWio1agtZfS1H/ltTqwtIfBnRq9zEwjc3eH8=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
@@ -90,8 +88,6 @@ github.com/cncf/xds/go v0.0.0-20260202195803-dba9d589def2 h1:aBangftG7EVZoUb69Os
github.com/cncf/xds/go v0.0.0-20260202195803-dba9d589def2/go.mod h1:qwXFYgsP6T7XnJtbKlf1HP8AjxZZyzxMmc+Lq5GjlU4=
github.com/cockroachdb/cockroach-go/v2 v2.4.3 h1:LJO3K3jC5WXvMePRQSJE1NsIGoFGcEx1LW83W6RAlhw=
github.com/cockroachdb/cockroach-go/v2 v2.4.3/go.mod h1:9U179XbCx4qFWtNhc7BiWLPfuyMVQ7qdAhfrwLz1vH0=
-github.com/coder/websocket v1.8.12 h1:5bUXkEPPIbewrnkU8LTCLVaxi4N4J8ahufH2vlo4NAo=
-github.com/coder/websocket v1.8.12/go.mod h1:LNVeNrXQZfe5qhS9ALED3uA+l5pPqvwXg3CKoDBB2gs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
@@ -324,8 +320,6 @@ github.com/thedevsaddam/govalidator v1.9.10 h1:m3dLRbSZ5Hts3VUWYe+vxLMG+FdyQuWOj
github.com/thedevsaddam/govalidator v1.9.10/go.mod h1:Ilx8u7cg5g3LXbSS943cx5kczyNuUn7LH/cK5MYuE90=
github.com/tiendc/go-deepcopy v1.7.2 h1:Ut2yYR7W9tWjTQitganoIue4UGxZwCcJy3orjrrIj44=
github.com/tiendc/go-deepcopy v1.7.2/go.mod h1:4bKjNC2r7boYOkD2IOuZpYjmlDdzjbpTRyCx+goBCJQ=
-github.com/tursodatabase/libsql-client-go v0.0.0-20260514053736-a9a8fadfe885 h1:YssVXwM/9nUAjGNmUWdgvb05JVcsaBrDn5yr+MaJTn0=
-github.com/tursodatabase/libsql-client-go v0.0.0-20260514053736-a9a8fadfe885/go.mod h1:08inkKyguB6CGGssc/JzhmQWwBgFQBgjlYFjxjRh7nU=
github.com/uptrace/uptrace-go v1.43.0 h1:5QuCdyFJdWUEXx6Fr6sYfezdgO6n6lnkOvUTLlyQO7U=
github.com/uptrace/uptrace-go v1.43.0/go.mod h1:ehDTIdtBSolg4Z0CCvg1C8yR6VX1YFDqBcg2KmsXWn0=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
@@ -426,8 +420,6 @@ golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/crypto v0.50.0 h1:zO47/JPrL6vsNkINmLoo/PH1gcxpls50DNogFvB5ZGI=
golang.org/x/crypto v0.50.0/go.mod h1:3muZ7vA7PBCE6xgPX7nkzzjiUq87kRItoJQM1Yo8S+Q=
-golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 h1:aAcj0Da7eBAtrTp03QXWvm88pSyOt+UgdZw2BFZ+lEw=
-golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8/go.mod h1:CQ1k9gNrJ50XIzaKCRR2hssIjF07kZFEiieALBM/ARQ=
golang.org/x/image v0.25.0 h1:Y6uW6rH1y5y/LK1J8BPWZtr6yZ7hrsy6hFrXjgsc2fQ=
golang.org/x/image v0.25.0/go.mod h1:tCAmOEGthTtkalusGp1g3xa2gke8J6c2N565dTyl9Rs=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
diff --git a/api/pkg/di/container.go b/api/pkg/di/container.go
index 94c2722e..c8b005d9 100644
--- a/api/pkg/di/container.go
+++ b/api/pkg/di/container.go
@@ -3,7 +3,6 @@ package di
import (
"context"
"crypto/tls"
- "database/sql"
"fmt"
"net/http"
"os"
@@ -84,7 +83,6 @@ type Container struct {
projectID string
db *gorm.DB
dedicatedDB *gorm.DB
- tursoDB *sql.DB
mongoClient *mongoDriver.Client
version string
app *fiber.App
@@ -295,23 +293,6 @@ func (container *Container) DedicatedDB() (db *gorm.DB) {
return container.dedicatedDB
}
-// TursoDB creates a *sql.DB connection to a Turso/libSQL database
-func (container *Container) TursoDB() *sql.DB {
- if container.tursoDB != nil {
- return container.tursoDB
- }
-
- container.logger.Debug("creating Turso *sql.DB connection")
-
- db, err := repositories.NewTursoDB(os.Getenv("TURSO_DATABASE_DSN"))
- if err != nil {
- container.logger.Fatal(err)
- }
-
- container.tursoDB = db
- return container.tursoDB
-}
-
// MongoDB creates a *mongo.Client connection to MongoDB Atlas
func (container *Container) MongoDB() *mongoDriver.Client {
if container.mongoClient != nil {
diff --git a/api/pkg/repositories/libsql.go b/api/pkg/repositories/libsql.go
deleted file mode 100644
index 66ee8c0e..00000000
--- a/api/pkg/repositories/libsql.go
+++ /dev/null
@@ -1,67 +0,0 @@
-package repositories
-
-import (
- "database/sql"
- "fmt"
-
- _ "github.com/tursodatabase/libsql-client-go/libsql" // libSQL database driver
-
- "github.com/palantir/stacktrace"
-)
-
-const (
- tableHeartbeats = "heartbeats"
- tableHeartbeatMonitors = "heartbeat_monitors"
-)
-
-// NewTursoDB creates a new *sql.DB connection to a Turso database and auto-creates tables
-func NewTursoDB(dsn string) (*sql.DB, error) {
- db, err := sql.Open("libsql", dsn)
- if err != nil {
- return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot open turso database with DSN [%s]", dsn))
- }
-
- if err = db.Ping(); err != nil {
- return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot ping turso database with DSN [%s]", dsn))
- }
-
- if err = createTursoTables(db); err != nil {
- return nil, stacktrace.Propagate(err, "cannot create turso tables")
- }
-
- return db, nil
-}
-
-func createTursoTables(db *sql.DB) error {
- statements := []string{
- `CREATE TABLE IF NOT EXISTS ` + tableHeartbeats + ` (
- id TEXT PRIMARY KEY,
- owner TEXT NOT NULL,
- version TEXT NOT NULL,
- charging INTEGER NOT NULL DEFAULT 0,
- user_id TEXT NOT NULL,
- timestamp DATETIME NOT NULL
- )`,
- `CREATE INDEX IF NOT EXISTS idx_heartbeats_owner_timestamp ON ` + tableHeartbeats + `(owner, timestamp)`,
- `CREATE INDEX IF NOT EXISTS idx_heartbeats_user_id ON ` + tableHeartbeats + `(user_id)`,
- `CREATE TABLE IF NOT EXISTS ` + tableHeartbeatMonitors + ` (
- id TEXT PRIMARY KEY,
- phone_id TEXT NOT NULL,
- user_id TEXT NOT NULL,
- queue_id TEXT NOT NULL DEFAULT '',
- owner TEXT NOT NULL,
- phone_online INTEGER NOT NULL DEFAULT 1,
- created_at DATETIME NOT NULL,
- updated_at DATETIME NOT NULL
- )`,
- `CREATE INDEX IF NOT EXISTS idx_heartbeat_monitors_user_owner ON ` + tableHeartbeatMonitors + `(user_id, owner)`,
- }
-
- for _, stmt := range statements {
- if _, err := db.Exec(stmt); err != nil {
- return stacktrace.Propagate(err, fmt.Sprintf("cannot execute statement: %s", stmt))
- }
- }
-
- return nil
-}
diff --git a/api/pkg/repositories/libsql_heartbeat_monitor_repository.go b/api/pkg/repositories/libsql_heartbeat_monitor_repository.go
deleted file mode 100644
index 0cb594af..00000000
--- a/api/pkg/repositories/libsql_heartbeat_monitor_repository.go
+++ /dev/null
@@ -1,199 +0,0 @@
-package repositories
-
-import (
- "context"
- "database/sql"
- "fmt"
- "time"
-
- "github.com/google/uuid"
-
- "github.com/NdoleStudio/httpsms/pkg/entities"
- "github.com/NdoleStudio/httpsms/pkg/telemetry"
- "github.com/palantir/stacktrace"
-)
-
-// libsqlHeartbeatMonitorRepository is responsible for persisting entities.HeartbeatMonitor in Turso/libSQL
-type libsqlHeartbeatMonitorRepository struct {
- logger telemetry.Logger
- tracer telemetry.Tracer
- db *sql.DB
-}
-
-// NewLibsqlHeartbeatMonitorRepository creates the libSQL version of the HeartbeatMonitorRepository
-func NewLibsqlHeartbeatMonitorRepository(
- logger telemetry.Logger,
- tracer telemetry.Tracer,
- db *sql.DB,
-) HeartbeatMonitorRepository {
- return &libsqlHeartbeatMonitorRepository{
- logger: logger.WithService(fmt.Sprintf("%T", &libsqlHeartbeatMonitorRepository{})),
- tracer: tracer,
- db: db,
- }
-}
-
-func (repository *libsqlHeartbeatMonitorRepository) Store(ctx context.Context, monitor *entities.HeartbeatMonitor) error {
- ctx, span := repository.tracer.Start(ctx)
- defer span.End()
-
- ctx, cancel := context.WithTimeout(ctx, dbOperationDuration)
- defer cancel()
-
- _, err := repository.db.ExecContext(ctx,
- "INSERT INTO "+tableHeartbeatMonitors+" (id, phone_id, user_id, queue_id, owner, phone_online, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
- monitor.ID.String(),
- monitor.PhoneID.String(),
- string(monitor.UserID),
- monitor.QueueID,
- monitor.Owner,
- boolToInt(monitor.PhoneOnline),
- monitor.CreatedAt.UTC(),
- monitor.UpdatedAt.UTC(),
- )
- if err != nil {
- msg := fmt.Sprintf("cannot save heartbeat monitor with ID [%s]", monitor.ID)
- return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
- }
-
- return nil
-}
-
-func (repository *libsqlHeartbeatMonitorRepository) Load(ctx context.Context, userID entities.UserID, phoneNumber string) (*entities.HeartbeatMonitor, error) {
- ctx, span := repository.tracer.Start(ctx)
- defer span.End()
-
- ctx, cancel := context.WithTimeout(ctx, dbOperationDuration)
- defer cancel()
-
- row := repository.db.QueryRowContext(ctx,
- "SELECT id, phone_id, user_id, queue_id, owner, phone_online, created_at, updated_at FROM "+tableHeartbeatMonitors+" WHERE user_id = ? AND owner = ? LIMIT 1",
- string(userID), phoneNumber,
- )
-
- monitor, err := repository.scanHeartbeatMonitorRow(row)
- if err == sql.ErrNoRows {
- msg := fmt.Sprintf("heartbeat monitor with userID [%s] and owner [%s] does not exist", userID, phoneNumber)
- return nil, repository.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, ErrCodeNotFound, msg))
- }
- if err != nil {
- msg := fmt.Sprintf("cannot load heartbeat monitor with userID [%s] and owner [%s]", userID, phoneNumber)
- return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
- }
-
- return monitor, nil
-}
-
-func (repository *libsqlHeartbeatMonitorRepository) Exists(ctx context.Context, userID entities.UserID, monitorID uuid.UUID) (bool, error) {
- ctx, span := repository.tracer.Start(ctx)
- defer span.End()
-
- ctx, cancel := context.WithTimeout(ctx, dbOperationDuration)
- defer cancel()
-
- var count int
- err := repository.db.QueryRowContext(ctx,
- "SELECT COUNT(*) FROM "+tableHeartbeatMonitors+" WHERE user_id = ? AND id = ?",
- string(userID), monitorID.String(),
- ).Scan(&count)
- if err != nil {
- msg := fmt.Sprintf("cannot check if heartbeat monitor exists with userID [%s] and monitor ID [%s]", userID, monitorID)
- return false, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
- }
-
- return count > 0, nil
-}
-
-func (repository *libsqlHeartbeatMonitorRepository) UpdateQueueID(ctx context.Context, monitorID uuid.UUID, queueID string) error {
- ctx, span := repository.tracer.Start(ctx)
- defer span.End()
-
- ctx, cancel := context.WithTimeout(ctx, dbOperationDuration)
- defer cancel()
-
- _, err := repository.db.ExecContext(ctx,
- "UPDATE "+tableHeartbeatMonitors+" SET queue_id = ?, updated_at = ? WHERE id = ?",
- queueID, time.Now().UTC(), monitorID.String(),
- )
- if err != nil {
- msg := fmt.Sprintf("cannot update heartbeat monitor ID [%s]", monitorID)
- return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
- }
-
- return nil
-}
-
-func (repository *libsqlHeartbeatMonitorRepository) Delete(ctx context.Context, userID entities.UserID, phoneNumber string) error {
- ctx, span := repository.tracer.Start(ctx)
- defer span.End()
-
- ctx, cancel := context.WithTimeout(ctx, dbOperationDuration)
- defer cancel()
-
- _, err := repository.db.ExecContext(ctx,
- "DELETE FROM "+tableHeartbeatMonitors+" WHERE user_id = ? AND owner = ?",
- string(userID), phoneNumber,
- )
- if err != nil {
- msg := fmt.Sprintf("cannot delete heartbeat monitor with owner [%s] and userID [%s]", phoneNumber, userID)
- return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
- }
-
- return nil
-}
-
-func (repository *libsqlHeartbeatMonitorRepository) UpdatePhoneOnline(ctx context.Context, userID entities.UserID, monitorID uuid.UUID, online bool) error {
- ctx, span := repository.tracer.Start(ctx)
- defer span.End()
-
- ctx, cancel := context.WithTimeout(ctx, dbOperationDuration)
- defer cancel()
-
- _, err := repository.db.ExecContext(ctx,
- "UPDATE "+tableHeartbeatMonitors+" SET phone_online = ?, updated_at = ? WHERE id = ? AND user_id = ?",
- boolToInt(online), time.Now().UTC(), monitorID.String(), string(userID),
- )
- if err != nil {
- msg := fmt.Sprintf("cannot update heartbeat monitor ID [%s] for user [%s]", monitorID, userID)
- return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
- }
-
- return nil
-}
-
-func (repository *libsqlHeartbeatMonitorRepository) DeleteAllForUser(ctx context.Context, userID entities.UserID) error {
- ctx, span := repository.tracer.Start(ctx)
- defer span.End()
-
- ctx, cancel := context.WithTimeout(ctx, dbOperationDuration)
- defer cancel()
-
- _, err := repository.db.ExecContext(ctx, "DELETE FROM "+tableHeartbeatMonitors+" WHERE user_id = ?", string(userID))
- if err != nil {
- msg := fmt.Sprintf("cannot delete all [%T] for user with ID [%s]", &entities.HeartbeatMonitor{}, userID)
- return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
- }
-
- return nil
-}
-
-func (repository *libsqlHeartbeatMonitorRepository) scanHeartbeatMonitorRow(row *sql.Row) (*entities.HeartbeatMonitor, error) {
- monitor := new(entities.HeartbeatMonitor)
- var id, phoneID, userID string
- var phoneOnline int
- err := row.Scan(&id, &phoneID, &userID, &monitor.QueueID, &monitor.Owner, &phoneOnline, &monitor.CreatedAt, &monitor.UpdatedAt)
- if err != nil {
- return nil, err
- }
- monitor.ID, err = uuid.Parse(id)
- if err != nil {
- return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot parse heartbeat monitor ID [%s]", id))
- }
- monitor.PhoneID, err = uuid.Parse(phoneID)
- if err != nil {
- return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot parse heartbeat monitor phone ID [%s]", phoneID))
- }
- monitor.UserID = entities.UserID(userID)
- monitor.PhoneOnline = phoneOnline != 0
- return monitor, nil
-}
diff --git a/api/pkg/repositories/libsql_heartbeat_repository.go b/api/pkg/repositories/libsql_heartbeat_repository.go
deleted file mode 100644
index 42fdf911..00000000
--- a/api/pkg/repositories/libsql_heartbeat_repository.go
+++ /dev/null
@@ -1,186 +0,0 @@
-package repositories
-
-import (
- "context"
- "database/sql"
- "fmt"
-
- "github.com/google/uuid"
-
- "github.com/NdoleStudio/httpsms/pkg/entities"
- "github.com/NdoleStudio/httpsms/pkg/telemetry"
- "github.com/palantir/stacktrace"
-)
-
-// libsqlHeartbeatRepository is responsible for persisting entities.Heartbeat in Turso/libSQL
-type libsqlHeartbeatRepository struct {
- logger telemetry.Logger
- tracer telemetry.Tracer
- db *sql.DB
-}
-
-// NewLibsqlHeartbeatRepository creates the libSQL version of the HeartbeatRepository
-func NewLibsqlHeartbeatRepository(
- logger telemetry.Logger,
- tracer telemetry.Tracer,
- db *sql.DB,
-) HeartbeatRepository {
- return &libsqlHeartbeatRepository{
- logger: logger.WithService(fmt.Sprintf("%T", &libsqlHeartbeatRepository{})),
- tracer: tracer,
- db: db,
- }
-}
-
-func (repository *libsqlHeartbeatRepository) Store(ctx context.Context, heartbeat *entities.Heartbeat) error {
- ctx, span, _ := repository.tracer.StartWithLogger(ctx, repository.logger)
- defer span.End()
-
- ctx, cancel := context.WithTimeout(ctx, dbOperationDuration)
- defer cancel()
-
- _, err := repository.db.ExecContext(ctx,
- "INSERT INTO "+tableHeartbeats+" (id, owner, version, charging, user_id, timestamp) VALUES (?, ?, ?, ?, ?, ?)",
- heartbeat.ID.String(),
- heartbeat.Owner,
- heartbeat.Version,
- boolToInt(heartbeat.Charging),
- string(heartbeat.UserID),
- heartbeat.Timestamp.UTC(),
- )
- if err != nil {
- msg := fmt.Sprintf("cannot save heartbeat with ID [%s]", heartbeat.ID)
- return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
- }
-
- return nil
-}
-
-func (repository *libsqlHeartbeatRepository) Index(ctx context.Context, userID entities.UserID, owner string, params IndexParams) (*[]entities.Heartbeat, error) {
- ctx, span := repository.tracer.Start(ctx)
- defer span.End()
-
- ctx, cancel := context.WithTimeout(ctx, dbOperationDuration)
- defer cancel()
-
- var rows *sql.Rows
- var err error
-
- if len(params.Query) > 0 {
- queryPattern := "%" + params.Query + "%"
- rows, err = repository.db.QueryContext(ctx,
- "SELECT id, owner, version, charging, user_id, timestamp FROM "+tableHeartbeats+" WHERE user_id = ? AND owner = ? AND version LIKE ? ORDER BY timestamp DESC LIMIT ? OFFSET ?",
- string(userID), owner, queryPattern, params.Limit, params.Skip,
- )
- } else {
- rows, err = repository.db.QueryContext(ctx,
- "SELECT id, owner, version, charging, user_id, timestamp FROM "+tableHeartbeats+" WHERE user_id = ? AND owner = ? ORDER BY timestamp DESC LIMIT ? OFFSET ?",
- string(userID), owner, params.Limit, params.Skip,
- )
- }
- if err != nil {
- msg := fmt.Sprintf("cannot fetch heartbeats with owner [%s] and params [%+#v]", owner, params)
- return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
- }
- defer rows.Close()
-
- heartbeats := make([]entities.Heartbeat, 0)
- for rows.Next() {
- heartbeat, scanErr := repository.scanHeartbeat(rows)
- if scanErr != nil {
- msg := fmt.Sprintf("cannot scan heartbeat row for owner [%s]", owner)
- return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(scanErr, msg))
- }
- heartbeats = append(heartbeats, *heartbeat)
- }
- if rowsErr := rows.Err(); rowsErr != nil {
- msg := fmt.Sprintf("error iterating heartbeat rows for owner [%s]", owner)
- return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(rowsErr, msg))
- }
-
- return &heartbeats, nil
-}
-
-func (repository *libsqlHeartbeatRepository) Last(ctx context.Context, userID entities.UserID, owner string) (*entities.Heartbeat, error) {
- ctx, span := repository.tracer.Start(ctx)
- defer span.End()
-
- ctx, cancel := context.WithTimeout(ctx, dbOperationDuration)
- defer cancel()
-
- row := repository.db.QueryRowContext(ctx,
- "SELECT id, owner, version, charging, user_id, timestamp FROM "+tableHeartbeats+" WHERE user_id = ? AND owner = ? ORDER BY timestamp DESC LIMIT 1",
- string(userID), owner,
- )
-
- heartbeat, err := repository.scanHeartbeatRow(row)
- if err == sql.ErrNoRows {
- msg := fmt.Sprintf("heartbeat with userID [%s] and owner [%s] does not exist", userID, owner)
- return nil, repository.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, ErrCodeNotFound, msg))
- }
- if err != nil {
- msg := fmt.Sprintf("cannot load heartbeat with userID [%s] and owner [%s]", userID, owner)
- return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
- }
-
- return heartbeat, nil
-}
-
-func (repository *libsqlHeartbeatRepository) DeleteAllForUser(ctx context.Context, userID entities.UserID) error {
- ctx, span := repository.tracer.Start(ctx)
- defer span.End()
-
- ctx, cancel := context.WithTimeout(ctx, dbOperationDuration)
- defer cancel()
-
- _, err := repository.db.ExecContext(ctx, "DELETE FROM "+tableHeartbeats+" WHERE user_id = ?", string(userID))
- if err != nil {
- msg := fmt.Sprintf("cannot delete all [%T] for user with ID [%s]", &entities.Heartbeat{}, userID)
- return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
- }
-
- return nil
-}
-
-func (repository *libsqlHeartbeatRepository) scanHeartbeat(rows *sql.Rows) (*entities.Heartbeat, error) {
- heartbeat := new(entities.Heartbeat)
- var id string
- var charging int
- var userID string
- err := rows.Scan(&id, &heartbeat.Owner, &heartbeat.Version, &charging, &userID, &heartbeat.Timestamp)
- if err != nil {
- return nil, err
- }
- heartbeat.ID, err = uuid.Parse(id)
- if err != nil {
- return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot parse heartbeat ID [%s]", id))
- }
- heartbeat.Charging = charging != 0
- heartbeat.UserID = entities.UserID(userID)
- return heartbeat, nil
-}
-
-func (repository *libsqlHeartbeatRepository) scanHeartbeatRow(row *sql.Row) (*entities.Heartbeat, error) {
- heartbeat := new(entities.Heartbeat)
- var id string
- var charging int
- var userID string
- err := row.Scan(&id, &heartbeat.Owner, &heartbeat.Version, &charging, &userID, &heartbeat.Timestamp)
- if err != nil {
- return nil, err
- }
- heartbeat.ID, err = uuid.Parse(id)
- if err != nil {
- return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot parse heartbeat ID [%s]", id))
- }
- heartbeat.Charging = charging != 0
- heartbeat.UserID = entities.UserID(userID)
- return heartbeat, nil
-}
-
-func boolToInt(b bool) int {
- if b {
- return 1
- }
- return 0
-}
From 24f1fc246893a63c4d1c9b49caad06bd4c5ac1ae Mon Sep 17 00:00:00 2001
From: Acho Arnold
Date: Sat, 16 May 2026 13:14:17 +0300
Subject: [PATCH 03/10] refactor: use entity structs directly with bson tags
instead of document types
- Add bson struct tags to entities.Heartbeat and entities.HeartbeatMonitor
- Register custom UUID codec so uuid.UUID is stored as string _id in MongoDB
- Remove intermediate heartbeatDocument/heartbeatMonitorDocument structs
- MongoDB repositories now marshal/unmarshal entities directly
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
---
api/pkg/entities/heartbeat.go | 12 ++--
api/pkg/entities/heartbeat_monitor.go | 16 ++---
.../mongo_heartbeat_monitor_repository.go | 56 ++---------------
.../mongo_heartbeat_repository.go | 62 +++----------------
api/pkg/repositories/mongodb.go | 33 +++++++++-
5 files changed, 58 insertions(+), 121 deletions(-)
diff --git a/api/pkg/entities/heartbeat.go b/api/pkg/entities/heartbeat.go
index 629efd29..abc070e9 100644
--- a/api/pkg/entities/heartbeat.go
+++ b/api/pkg/entities/heartbeat.go
@@ -8,10 +8,10 @@ import (
// Heartbeat represents is a pulse from an active phone
type Heartbeat struct {
- ID uuid.UUID `json:"id" gorm:"primaryKey;type:uuid;" example:"32343a19-da5e-4b1b-a767-3298a73703cb"`
- Owner string `json:"owner" gorm:"index:idx_heartbeats_owner_timestamp" example:"+18005550199"`
- Version string `json:"version" example:"344c10f"`
- Charging bool `json:"charging" example:"true"`
- UserID UserID `json:"user_id" example:"WB7DRDWrJZRGbYrv2CKGkqbzvqdC"`
- Timestamp time.Time `json:"timestamp" gorm:"index:idx_heartbeats_owner_timestamp" example:"2022-06-05T14:26:01.520828+03:00"`
+ ID uuid.UUID `json:"id" gorm:"primaryKey;type:uuid;" bson:"_id" example:"32343a19-da5e-4b1b-a767-3298a73703cb"`
+ Owner string `json:"owner" gorm:"index:idx_heartbeats_owner_timestamp" bson:"owner" example:"+18005550199"`
+ Version string `json:"version" bson:"version" example:"344c10f"`
+ Charging bool `json:"charging" bson:"charging" example:"true"`
+ UserID UserID `json:"user_id" bson:"user_id" example:"WB7DRDWrJZRGbYrv2CKGkqbzvqdC"`
+ Timestamp time.Time `json:"timestamp" gorm:"index:idx_heartbeats_owner_timestamp" bson:"timestamp" example:"2022-06-05T14:26:01.520828+03:00"`
}
diff --git a/api/pkg/entities/heartbeat_monitor.go b/api/pkg/entities/heartbeat_monitor.go
index 6b41a31a..7151f195 100644
--- a/api/pkg/entities/heartbeat_monitor.go
+++ b/api/pkg/entities/heartbeat_monitor.go
@@ -8,14 +8,14 @@ import (
// HeartbeatMonitor is used to monitor heartbeats of a phone
type HeartbeatMonitor struct {
- ID uuid.UUID `json:"id" gorm:"primaryKey;type:uuid;" example:"32343a19-da5e-4b1b-a767-3298a73703cb"`
- PhoneID uuid.UUID `json:"phone_id" example:"32343a19-da5e-4b1b-a767-3298a73703cb"`
- UserID UserID `json:"user_id" example:"WB7DRDWrJZRGbYrv2CKGkqbzvqdC"`
- QueueID string `json:"queue_id" example:"0360259236613675274"`
- Owner string `json:"owner" example:"+18005550199"`
- PhoneOnline bool `json:"phone_online" example:"true" default:"true"`
- CreatedAt time.Time `json:"created_at" example:"2022-06-05T14:26:02.302718+03:00"`
- UpdatedAt time.Time `json:"updated_at" example:"2022-06-05T14:26:10.303278+03:00"`
+ ID uuid.UUID `json:"id" gorm:"primaryKey;type:uuid;" bson:"_id" example:"32343a19-da5e-4b1b-a767-3298a73703cb"`
+ PhoneID uuid.UUID `json:"phone_id" bson:"phone_id" example:"32343a19-da5e-4b1b-a767-3298a73703cb"`
+ UserID UserID `json:"user_id" bson:"user_id" example:"WB7DRDWrJZRGbYrv2CKGkqbzvqdC"`
+ QueueID string `json:"queue_id" bson:"queue_id" example:"0360259236613675274"`
+ Owner string `json:"owner" bson:"owner" example:"+18005550199"`
+ PhoneOnline bool `json:"phone_online" bson:"phone_online" example:"true" default:"true"`
+ CreatedAt time.Time `json:"created_at" bson:"created_at" example:"2022-06-05T14:26:02.302718+03:00"`
+ UpdatedAt time.Time `json:"updated_at" bson:"updated_at" example:"2022-06-05T14:26:10.303278+03:00"`
}
// RequiresCheck returns true if the heartbeat monitor requires a check
diff --git a/api/pkg/repositories/mongo_heartbeat_monitor_repository.go b/api/pkg/repositories/mongo_heartbeat_monitor_repository.go
index 1f5f014f..ae9a68cd 100644
--- a/api/pkg/repositories/mongo_heartbeat_monitor_repository.go
+++ b/api/pkg/repositories/mongo_heartbeat_monitor_repository.go
@@ -34,17 +34,6 @@ func NewMongoHeartbeatMonitorRepository(
}
}
-type heartbeatMonitorDocument struct {
- ID string `bson:"_id"`
- PhoneID string `bson:"phone_id"`
- UserID string `bson:"user_id"`
- QueueID string `bson:"queue_id"`
- Owner string `bson:"owner"`
- PhoneOnline bool `bson:"phone_online"`
- CreatedAt time.Time `bson:"created_at"`
- UpdatedAt time.Time `bson:"updated_at"`
-}
-
func (repository *mongoHeartbeatMonitorRepository) Store(ctx context.Context, monitor *entities.HeartbeatMonitor) error {
ctx, span := repository.tracer.Start(ctx)
defer span.End()
@@ -52,18 +41,7 @@ func (repository *mongoHeartbeatMonitorRepository) Store(ctx context.Context, mo
ctx, cancel := context.WithTimeout(ctx, dbOperationDuration)
defer cancel()
- doc := heartbeatMonitorDocument{
- ID: monitor.ID.String(),
- PhoneID: monitor.PhoneID.String(),
- UserID: string(monitor.UserID),
- QueueID: monitor.QueueID,
- Owner: monitor.Owner,
- PhoneOnline: monitor.PhoneOnline,
- CreatedAt: monitor.CreatedAt.UTC(),
- UpdatedAt: monitor.UpdatedAt.UTC(),
- }
-
- _, err := repository.collection.InsertOne(ctx, doc)
+ _, err := repository.collection.InsertOne(ctx, monitor)
if err != nil {
msg := fmt.Sprintf("cannot save heartbeat monitor with ID [%s]", monitor.ID)
return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
@@ -84,8 +62,8 @@ func (repository *mongoHeartbeatMonitorRepository) Load(ctx context.Context, use
{"owner", phoneNumber},
}
- var doc heartbeatMonitorDocument
- err := repository.collection.FindOne(ctx, filter).Decode(&doc)
+ var monitor entities.HeartbeatMonitor
+ err := repository.collection.FindOne(ctx, filter).Decode(&monitor)
if err == mongo.ErrNoDocuments {
msg := fmt.Sprintf("heartbeat monitor with userID [%s] and owner [%s] does not exist", userID, phoneNumber)
return nil, repository.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, ErrCodeNotFound, msg))
@@ -95,12 +73,7 @@ func (repository *mongoHeartbeatMonitorRepository) Load(ctx context.Context, use
return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
}
- monitor, err := docToHeartbeatMonitor(doc)
- if err != nil {
- return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, "cannot convert heartbeat monitor document"))
- }
-
- return monitor, nil
+ return &monitor, nil
}
func (repository *mongoHeartbeatMonitorRepository) Exists(ctx context.Context, userID entities.UserID, monitorID uuid.UUID) (bool, error) {
@@ -207,24 +180,3 @@ func (repository *mongoHeartbeatMonitorRepository) DeleteAllForUser(ctx context.
return nil
}
-
-func docToHeartbeatMonitor(doc heartbeatMonitorDocument) (*entities.HeartbeatMonitor, error) {
- id, err := uuid.Parse(doc.ID)
- if err != nil {
- return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot parse heartbeat monitor ID [%s]", doc.ID))
- }
- phoneID, err := uuid.Parse(doc.PhoneID)
- if err != nil {
- return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot parse heartbeat monitor phone ID [%s]", doc.PhoneID))
- }
- return &entities.HeartbeatMonitor{
- ID: id,
- PhoneID: phoneID,
- UserID: entities.UserID(doc.UserID),
- QueueID: doc.QueueID,
- Owner: doc.Owner,
- PhoneOnline: doc.PhoneOnline,
- CreatedAt: doc.CreatedAt,
- UpdatedAt: doc.UpdatedAt,
- }, nil
-}
diff --git a/api/pkg/repositories/mongo_heartbeat_repository.go b/api/pkg/repositories/mongo_heartbeat_repository.go
index 13802e06..68ea6e5f 100644
--- a/api/pkg/repositories/mongo_heartbeat_repository.go
+++ b/api/pkg/repositories/mongo_heartbeat_repository.go
@@ -3,9 +3,7 @@ package repositories
import (
"context"
"fmt"
- "time"
- "github.com/google/uuid"
"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/mongo"
"go.mongodb.org/mongo-driver/v2/mongo/options"
@@ -35,15 +33,6 @@ func NewMongoHeartbeatRepository(
}
}
-type heartbeatDocument struct {
- ID string `bson:"_id"`
- Owner string `bson:"owner"`
- Version string `bson:"version"`
- Charging bool `bson:"charging"`
- UserID string `bson:"user_id"`
- Timestamp time.Time `bson:"timestamp"`
-}
-
func (repository *mongoHeartbeatRepository) Store(ctx context.Context, heartbeat *entities.Heartbeat) error {
ctx, span, _ := repository.tracer.StartWithLogger(ctx, repository.logger)
defer span.End()
@@ -51,16 +40,7 @@ func (repository *mongoHeartbeatRepository) Store(ctx context.Context, heartbeat
ctx, cancel := context.WithTimeout(ctx, dbOperationDuration)
defer cancel()
- doc := heartbeatDocument{
- ID: heartbeat.ID.String(),
- Owner: heartbeat.Owner,
- Version: heartbeat.Version,
- Charging: heartbeat.Charging,
- UserID: string(heartbeat.UserID),
- Timestamp: heartbeat.Timestamp.UTC(),
- }
-
- _, err := repository.collection.InsertOne(ctx, doc)
+ _, err := repository.collection.InsertOne(ctx, heartbeat)
if err != nil {
msg := fmt.Sprintf("cannot save heartbeat with ID [%s]", heartbeat.ID)
return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
@@ -97,20 +77,14 @@ func (repository *mongoHeartbeatRepository) Index(ctx context.Context, userID en
}
defer cursor.Close(ctx)
- var docs []heartbeatDocument
- if err = cursor.All(ctx, &docs); err != nil {
+ var heartbeats []entities.Heartbeat
+ if err = cursor.All(ctx, &heartbeats); err != nil {
msg := fmt.Sprintf("cannot decode heartbeats for owner [%s]", owner)
return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
}
- heartbeats := make([]entities.Heartbeat, 0, len(docs))
- for _, doc := range docs {
- hb, convertErr := docToHeartbeat(doc)
- if convertErr != nil {
- msg := fmt.Sprintf("cannot convert heartbeat document for owner [%s]", owner)
- return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(convertErr, msg))
- }
- heartbeats = append(heartbeats, *hb)
+ if heartbeats == nil {
+ heartbeats = make([]entities.Heartbeat, 0)
}
return &heartbeats, nil
@@ -130,8 +104,8 @@ func (repository *mongoHeartbeatRepository) Last(ctx context.Context, userID ent
opts := options.FindOne().SetSort(bson.D{{"timestamp", -1}})
- var doc heartbeatDocument
- err := repository.collection.FindOne(ctx, filter, opts).Decode(&doc)
+ var heartbeat entities.Heartbeat
+ err := repository.collection.FindOne(ctx, filter, opts).Decode(&heartbeat)
if err == mongo.ErrNoDocuments {
msg := fmt.Sprintf("heartbeat with userID [%s] and owner [%s] does not exist", userID, owner)
return nil, repository.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, ErrCodeNotFound, msg))
@@ -141,12 +115,7 @@ func (repository *mongoHeartbeatRepository) Last(ctx context.Context, userID ent
return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg))
}
- heartbeat, err := docToHeartbeat(doc)
- if err != nil {
- return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, "cannot convert heartbeat document"))
- }
-
- return heartbeat, nil
+ return &heartbeat, nil
}
func (repository *mongoHeartbeatRepository) DeleteAllForUser(ctx context.Context, userID entities.UserID) error {
@@ -164,18 +133,3 @@ func (repository *mongoHeartbeatRepository) DeleteAllForUser(ctx context.Context
return nil
}
-
-func docToHeartbeat(doc heartbeatDocument) (*entities.Heartbeat, error) {
- id, err := uuid.Parse(doc.ID)
- if err != nil {
- return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot parse heartbeat ID [%s]", doc.ID))
- }
- return &entities.Heartbeat{
- ID: id,
- Owner: doc.Owner,
- Version: doc.Version,
- Charging: doc.Charging,
- UserID: entities.UserID(doc.UserID),
- Timestamp: doc.Timestamp,
- }, nil
-}
diff --git a/api/pkg/repositories/mongodb.go b/api/pkg/repositories/mongodb.go
index 80328bed..e991e13b 100644
--- a/api/pkg/repositories/mongodb.go
+++ b/api/pkg/repositories/mongodb.go
@@ -3,8 +3,10 @@ package repositories
import (
"context"
"fmt"
+ "reflect"
"time"
+ "github.com/google/uuid"
"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/mongo"
"go.mongodb.org/mongo-driver/v2/mongo/options"
@@ -18,13 +20,42 @@ const (
collectionHeartbeatMonitors = "heartbeat_monitors"
)
+// uuidEncodeValue encodes uuid.UUID as a BSON string
+func uuidEncodeValue(_ bson.EncodeContext, vw bson.ValueWriter, val reflect.Value) error {
+ u := val.Interface().(uuid.UUID)
+ return vw.WriteString(u.String())
+}
+
+// uuidDecodeValue decodes a BSON string into uuid.UUID
+func uuidDecodeValue(_ bson.DecodeContext, vr bson.ValueReader, val reflect.Value) error {
+ str, err := vr.ReadString()
+ if err != nil {
+ return err
+ }
+ parsed, err := uuid.Parse(str)
+ if err != nil {
+ return err
+ }
+ val.Set(reflect.ValueOf(parsed))
+ return nil
+}
+
+// newMongoRegistry creates a BSON registry that encodes uuid.UUID as strings
+func newMongoRegistry() *bson.Registry {
+ rb := bson.NewRegistry()
+ rb.RegisterTypeEncoder(reflect.TypeOf(uuid.UUID{}), bson.ValueEncoderFunc(uuidEncodeValue))
+ rb.RegisterTypeDecoder(reflect.TypeOf(uuid.UUID{}), bson.ValueDecoderFunc(uuidDecodeValue))
+ return rb
+}
+
// NewMongoDB creates a new *mongo.Client connection to MongoDB Atlas and ensures indexes
func NewMongoDB(uri string) (*mongo.Client, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
serverAPI := options.ServerAPI(options.ServerAPIVersion1)
- opts := options.Client().ApplyURI(uri).SetServerAPIOptions(serverAPI)
+ registry := newMongoRegistry()
+ opts := options.Client().ApplyURI(uri).SetServerAPIOptions(serverAPI).SetRegistry(registry)
client, err := mongo.Connect(opts)
if err != nil {
From 6a92f6ca8a21e8d94aac31265c41e7f5e99f9eb1 Mon Sep 17 00:00:00 2001
From: Acho Arnold
Date: Sat, 16 May 2026 13:17:58 +0300
Subject: [PATCH 04/10] refactor: derive MongoDB database name from URI appName
parameter
- Remove hardcoded mongoDBName constant
- Parse appName query parameter from MONGODB_URI as the database name
- NewMongoDB now returns (client, dbName, error)
- Repository constructors accept dbName parameter
- DI container caches and passes the DB name to repositories
- Update test .env to include appName=httpsms in URI
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
---
api/pkg/di/container.go | 18 ++++++--
.../mongo_heartbeat_monitor_repository.go | 3 +-
.../mongo_heartbeat_repository.go | 3 +-
api/pkg/repositories/mongodb.go | 41 ++++++++++++++-----
tests/.env.test | 2 +-
5 files changed, 51 insertions(+), 16 deletions(-)
diff --git a/api/pkg/di/container.go b/api/pkg/di/container.go
index c8b005d9..4fbde132 100644
--- a/api/pkg/di/container.go
+++ b/api/pkg/di/container.go
@@ -84,6 +84,7 @@ type Container struct {
db *gorm.DB
dedicatedDB *gorm.DB
mongoClient *mongoDriver.Client
+ mongoDBName string
version string
app *fiber.App
eventDispatcher *services.EventDispatcher
@@ -301,15 +302,24 @@ func (container *Container) MongoDB() *mongoDriver.Client {
container.logger.Debug("creating MongoDB *mongo.Client connection")
- client, err := repositories.NewMongoDB(os.Getenv("MONGODB_URI"))
+ client, dbName, err := repositories.NewMongoDB(os.Getenv("MONGODB_URI"))
if err != nil {
container.logger.Fatal(err)
}
container.mongoClient = client
+ container.mongoDBName = dbName
return container.mongoClient
}
+// MongoDBName returns the MongoDB database name derived from the connection URI appName
+func (container *Container) MongoDBName() string {
+ if container.mongoClient == nil {
+ container.MongoDB()
+ }
+ return container.mongoDBName
+}
+
// HedgingFailureCounter creates an OTel counter for hedging secondary write failures
func (container *Container) HedgingFailureCounter() otelMetric.Int64Counter {
meter := otel.GetMeterProvider().Meter(
@@ -928,6 +938,7 @@ func (container *Container) HeartbeatMonitorRepository() (repository repositorie
container.Logger(),
container.Tracer(),
container.MongoDB(),
+ container.MongoDBName(),
)
case "hedging":
container.logger.Debug("creating hedging repositories.HeartbeatMonitorRepository")
@@ -935,7 +946,7 @@ func (container *Container) HeartbeatMonitorRepository() (repository repositorie
container.Logger(),
container.Tracer(),
repositories.NewGormHeartbeatMonitorRepository(container.Logger(), container.Tracer(), container.DedicatedDB()),
- repositories.NewMongoHeartbeatMonitorRepository(container.Logger(), container.Tracer(), container.MongoDB()),
+ repositories.NewMongoHeartbeatMonitorRepository(container.Logger(), container.Tracer(), container.MongoDB(), container.MongoDBName()),
container.HedgingFailureCounter(),
)
default:
@@ -1766,6 +1777,7 @@ func (container *Container) HeartbeatRepository() repositories.HeartbeatReposito
container.Logger(),
container.Tracer(),
container.MongoDB(),
+ container.MongoDBName(),
)
case "hedging":
container.logger.Debug("creating hedging repositories.HeartbeatRepository")
@@ -1773,7 +1785,7 @@ func (container *Container) HeartbeatRepository() repositories.HeartbeatReposito
container.Logger(),
container.Tracer(),
repositories.NewGormHeartbeatRepository(container.Logger(), container.Tracer(), container.DedicatedDB()),
- repositories.NewMongoHeartbeatRepository(container.Logger(), container.Tracer(), container.MongoDB()),
+ repositories.NewMongoHeartbeatRepository(container.Logger(), container.Tracer(), container.MongoDB(), container.MongoDBName()),
container.HedgingFailureCounter(),
)
default:
diff --git a/api/pkg/repositories/mongo_heartbeat_monitor_repository.go b/api/pkg/repositories/mongo_heartbeat_monitor_repository.go
index ae9a68cd..5d9541dd 100644
--- a/api/pkg/repositories/mongo_heartbeat_monitor_repository.go
+++ b/api/pkg/repositories/mongo_heartbeat_monitor_repository.go
@@ -26,11 +26,12 @@ func NewMongoHeartbeatMonitorRepository(
logger telemetry.Logger,
tracer telemetry.Tracer,
client *mongo.Client,
+ dbName string,
) HeartbeatMonitorRepository {
return &mongoHeartbeatMonitorRepository{
logger: logger.WithService(fmt.Sprintf("%T", &mongoHeartbeatMonitorRepository{})),
tracer: tracer,
- collection: client.Database(mongoDBName).Collection(collectionHeartbeatMonitors),
+ collection: client.Database(dbName).Collection(collectionHeartbeatMonitors),
}
}
diff --git a/api/pkg/repositories/mongo_heartbeat_repository.go b/api/pkg/repositories/mongo_heartbeat_repository.go
index 68ea6e5f..ed2f8d8a 100644
--- a/api/pkg/repositories/mongo_heartbeat_repository.go
+++ b/api/pkg/repositories/mongo_heartbeat_repository.go
@@ -25,11 +25,12 @@ func NewMongoHeartbeatRepository(
logger telemetry.Logger,
tracer telemetry.Tracer,
client *mongo.Client,
+ dbName string,
) HeartbeatRepository {
return &mongoHeartbeatRepository{
logger: logger.WithService(fmt.Sprintf("%T", &mongoHeartbeatRepository{})),
tracer: tracer,
- collection: client.Database(mongoDBName).Collection(collectionHeartbeats),
+ collection: client.Database(dbName).Collection(collectionHeartbeats),
}
}
diff --git a/api/pkg/repositories/mongodb.go b/api/pkg/repositories/mongodb.go
index e991e13b..aeb24a0f 100644
--- a/api/pkg/repositories/mongodb.go
+++ b/api/pkg/repositories/mongodb.go
@@ -3,6 +3,7 @@ package repositories
import (
"context"
"fmt"
+ "net/url"
"reflect"
"time"
@@ -15,7 +16,6 @@ import (
)
const (
- mongoDBName = "httpsms"
collectionHeartbeats = "heartbeats"
collectionHeartbeatMonitors = "heartbeat_monitors"
)
@@ -48,8 +48,14 @@ func newMongoRegistry() *bson.Registry {
return rb
}
-// NewMongoDB creates a new *mongo.Client connection to MongoDB Atlas and ensures indexes
-func NewMongoDB(uri string) (*mongo.Client, error) {
+// NewMongoDB creates a new *mongo.Client connection to MongoDB Atlas and ensures indexes.
+// The database name is derived from the appName query parameter in the URI.
+func NewMongoDB(uri string) (*mongo.Client, string, error) {
+ dbName, err := parseMongoDBName(uri)
+ if err != nil {
+ return nil, "", stacktrace.Propagate(err, "cannot parse database name from MongoDB URI")
+ }
+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
@@ -59,22 +65,37 @@ func NewMongoDB(uri string) (*mongo.Client, error) {
client, err := mongo.Connect(opts)
if err != nil {
- return nil, stacktrace.Propagate(err, "cannot connect to MongoDB Atlas")
+ return nil, "", stacktrace.Propagate(err, "cannot connect to MongoDB Atlas")
}
if err = client.Ping(ctx, nil); err != nil {
- return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot ping MongoDB with URI [%s]", uri))
+ return nil, "", stacktrace.Propagate(err, fmt.Sprintf("cannot ping MongoDB with URI [%s]", uri))
+ }
+
+ if err = createMongoIndexes(ctx, client, dbName); err != nil {
+ return nil, "", stacktrace.Propagate(err, "cannot create MongoDB indexes")
+ }
+
+ return client, dbName, nil
+}
+
+// parseMongoDBName extracts the appName query parameter from the MongoDB URI to use as the database name
+func parseMongoDBName(uri string) (string, error) {
+ parsed, err := url.Parse(uri)
+ if err != nil {
+ return "", stacktrace.Propagate(err, fmt.Sprintf("cannot parse MongoDB URI [%s]", uri))
}
- if err = createMongoIndexes(ctx, client); err != nil {
- return nil, stacktrace.Propagate(err, "cannot create MongoDB indexes")
+ appName := parsed.Query().Get("appName")
+ if appName == "" {
+ return "", stacktrace.NewError("MongoDB URI is missing the 'appName' query parameter which is used as the database name")
}
- return client, nil
+ return appName, nil
}
-func createMongoIndexes(ctx context.Context, client *mongo.Client) error {
- db := client.Database(mongoDBName)
+func createMongoIndexes(ctx context.Context, client *mongo.Client, dbName string) error {
+ db := client.Database(dbName)
// Heartbeats indexes
heartbeatsCol := db.Collection(collectionHeartbeats)
diff --git a/tests/.env.test b/tests/.env.test
index 3cec6a66..ac18af92 100644
--- a/tests/.env.test
+++ b/tests/.env.test
@@ -29,4 +29,4 @@ GCS_BUCKET_NAME=
UPTRACE_DSN=
CLOUDFLARE_TURNSTILE_SECRET_KEY=
HEARTBEAT_DB_BACKEND=hedging
-MONGODB_URI=mongodb://httpsms:testpassword@mongodb:27017/?authSource=admin
+MONGODB_URI=mongodb://httpsms:testpassword@mongodb:27017/?authSource=admin&appName=httpsms
From fad8b324f1b7c0d835e49646b664f66214716b34 Mon Sep 17 00:00:00 2001
From: Acho Arnold
Date: Sat, 16 May 2026 13:20:14 +0300
Subject: [PATCH 05/10] refactor: pass *mongo.Database directly to repository
constructors
- NewMongoDB now returns *mongo.Database instead of (*mongo.Client, dbName)
- Repository constructors accept *mongo.Database instead of client + dbName
- DI container caches the *mongo.Database singleton directly
- Removes MongoDBName() helper - no longer needed
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
---
api/pkg/di/container.go | 34 ++++++-------------
.../mongo_heartbeat_monitor_repository.go | 5 ++-
.../mongo_heartbeat_repository.go | 5 ++-
api/pkg/repositories/mongodb.go | 22 ++++++------
4 files changed, 26 insertions(+), 40 deletions(-)
diff --git a/api/pkg/di/container.go b/api/pkg/di/container.go
index 4fbde132..e9ca3718 100644
--- a/api/pkg/di/container.go
+++ b/api/pkg/di/container.go
@@ -83,8 +83,7 @@ type Container struct {
projectID string
db *gorm.DB
dedicatedDB *gorm.DB
- mongoClient *mongoDriver.Client
- mongoDBName string
+ mongoDB *mongoDriver.Database
version string
app *fiber.App
eventDispatcher *services.EventDispatcher
@@ -294,30 +293,21 @@ func (container *Container) DedicatedDB() (db *gorm.DB) {
return container.dedicatedDB
}
-// MongoDB creates a *mongo.Client connection to MongoDB Atlas
-func (container *Container) MongoDB() *mongoDriver.Client {
- if container.mongoClient != nil {
- return container.mongoClient
+// MongoDB creates a *mongo.Database connection to MongoDB Atlas
+func (container *Container) MongoDB() *mongoDriver.Database {
+ if container.mongoDB != nil {
+ return container.mongoDB
}
- container.logger.Debug("creating MongoDB *mongo.Client connection")
+ container.logger.Debug("creating MongoDB *mongo.Database connection")
- client, dbName, err := repositories.NewMongoDB(os.Getenv("MONGODB_URI"))
+ db, err := repositories.NewMongoDB(os.Getenv("MONGODB_URI"))
if err != nil {
container.logger.Fatal(err)
}
- container.mongoClient = client
- container.mongoDBName = dbName
- return container.mongoClient
-}
-
-// MongoDBName returns the MongoDB database name derived from the connection URI appName
-func (container *Container) MongoDBName() string {
- if container.mongoClient == nil {
- container.MongoDB()
- }
- return container.mongoDBName
+ container.mongoDB = db
+ return container.mongoDB
}
// HedgingFailureCounter creates an OTel counter for hedging secondary write failures
@@ -938,7 +928,6 @@ func (container *Container) HeartbeatMonitorRepository() (repository repositorie
container.Logger(),
container.Tracer(),
container.MongoDB(),
- container.MongoDBName(),
)
case "hedging":
container.logger.Debug("creating hedging repositories.HeartbeatMonitorRepository")
@@ -946,7 +935,7 @@ func (container *Container) HeartbeatMonitorRepository() (repository repositorie
container.Logger(),
container.Tracer(),
repositories.NewGormHeartbeatMonitorRepository(container.Logger(), container.Tracer(), container.DedicatedDB()),
- repositories.NewMongoHeartbeatMonitorRepository(container.Logger(), container.Tracer(), container.MongoDB(), container.MongoDBName()),
+ repositories.NewMongoHeartbeatMonitorRepository(container.Logger(), container.Tracer(), container.MongoDB()),
container.HedgingFailureCounter(),
)
default:
@@ -1777,7 +1766,6 @@ func (container *Container) HeartbeatRepository() repositories.HeartbeatReposito
container.Logger(),
container.Tracer(),
container.MongoDB(),
- container.MongoDBName(),
)
case "hedging":
container.logger.Debug("creating hedging repositories.HeartbeatRepository")
@@ -1785,7 +1773,7 @@ func (container *Container) HeartbeatRepository() repositories.HeartbeatReposito
container.Logger(),
container.Tracer(),
repositories.NewGormHeartbeatRepository(container.Logger(), container.Tracer(), container.DedicatedDB()),
- repositories.NewMongoHeartbeatRepository(container.Logger(), container.Tracer(), container.MongoDB(), container.MongoDBName()),
+ repositories.NewMongoHeartbeatRepository(container.Logger(), container.Tracer(), container.MongoDB()),
container.HedgingFailureCounter(),
)
default:
diff --git a/api/pkg/repositories/mongo_heartbeat_monitor_repository.go b/api/pkg/repositories/mongo_heartbeat_monitor_repository.go
index 5d9541dd..13200c07 100644
--- a/api/pkg/repositories/mongo_heartbeat_monitor_repository.go
+++ b/api/pkg/repositories/mongo_heartbeat_monitor_repository.go
@@ -25,13 +25,12 @@ type mongoHeartbeatMonitorRepository struct {
func NewMongoHeartbeatMonitorRepository(
logger telemetry.Logger,
tracer telemetry.Tracer,
- client *mongo.Client,
- dbName string,
+ db *mongo.Database,
) HeartbeatMonitorRepository {
return &mongoHeartbeatMonitorRepository{
logger: logger.WithService(fmt.Sprintf("%T", &mongoHeartbeatMonitorRepository{})),
tracer: tracer,
- collection: client.Database(dbName).Collection(collectionHeartbeatMonitors),
+ collection: db.Collection(collectionHeartbeatMonitors),
}
}
diff --git a/api/pkg/repositories/mongo_heartbeat_repository.go b/api/pkg/repositories/mongo_heartbeat_repository.go
index ed2f8d8a..d8a7839c 100644
--- a/api/pkg/repositories/mongo_heartbeat_repository.go
+++ b/api/pkg/repositories/mongo_heartbeat_repository.go
@@ -24,13 +24,12 @@ type mongoHeartbeatRepository struct {
func NewMongoHeartbeatRepository(
logger telemetry.Logger,
tracer telemetry.Tracer,
- client *mongo.Client,
- dbName string,
+ db *mongo.Database,
) HeartbeatRepository {
return &mongoHeartbeatRepository{
logger: logger.WithService(fmt.Sprintf("%T", &mongoHeartbeatRepository{})),
tracer: tracer,
- collection: client.Database(dbName).Collection(collectionHeartbeats),
+ collection: db.Collection(collectionHeartbeats),
}
}
diff --git a/api/pkg/repositories/mongodb.go b/api/pkg/repositories/mongodb.go
index aeb24a0f..bbad0c07 100644
--- a/api/pkg/repositories/mongodb.go
+++ b/api/pkg/repositories/mongodb.go
@@ -48,12 +48,12 @@ func newMongoRegistry() *bson.Registry {
return rb
}
-// NewMongoDB creates a new *mongo.Client connection to MongoDB Atlas and ensures indexes.
+// NewMongoDB creates a new *mongo.Database connection to MongoDB Atlas and ensures indexes.
// The database name is derived from the appName query parameter in the URI.
-func NewMongoDB(uri string) (*mongo.Client, string, error) {
+func NewMongoDB(uri string) (*mongo.Database, error) {
dbName, err := parseMongoDBName(uri)
if err != nil {
- return nil, "", stacktrace.Propagate(err, "cannot parse database name from MongoDB URI")
+ return nil, stacktrace.Propagate(err, "cannot parse database name from MongoDB URI")
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
@@ -65,18 +65,20 @@ func NewMongoDB(uri string) (*mongo.Client, string, error) {
client, err := mongo.Connect(opts)
if err != nil {
- return nil, "", stacktrace.Propagate(err, "cannot connect to MongoDB Atlas")
+ return nil, stacktrace.Propagate(err, "cannot connect to MongoDB Atlas")
}
if err = client.Ping(ctx, nil); err != nil {
- return nil, "", stacktrace.Propagate(err, fmt.Sprintf("cannot ping MongoDB with URI [%s]", uri))
+ return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot ping MongoDB with URI [%s]", uri))
}
- if err = createMongoIndexes(ctx, client, dbName); err != nil {
- return nil, "", stacktrace.Propagate(err, "cannot create MongoDB indexes")
+ db := client.Database(dbName)
+
+ if err = createMongoIndexes(ctx, db); err != nil {
+ return nil, stacktrace.Propagate(err, "cannot create MongoDB indexes")
}
- return client, dbName, nil
+ return db, nil
}
// parseMongoDBName extracts the appName query parameter from the MongoDB URI to use as the database name
@@ -94,9 +96,7 @@ func parseMongoDBName(uri string) (string, error) {
return appName, nil
}
-func createMongoIndexes(ctx context.Context, client *mongo.Client, dbName string) error {
- db := client.Database(dbName)
-
+func createMongoIndexes(ctx context.Context, db *mongo.Database) error {
// Heartbeats indexes
heartbeatsCol := db.Collection(collectionHeartbeats)
_, err := heartbeatsCol.Indexes().CreateMany(ctx, []mongo.IndexModel{
From 563453e07df41f21715f400cbdaa5094ad8d39c9 Mon Sep 17 00:00:00 2001
From: Acho Arnold
Date: Sat, 16 May 2026 13:20:43 +0300
Subject: [PATCH 06/10] fix: correct outgoing message queue docs URL in
settings
Fix typo in URL: outgiong -> outgoing
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
---
web/pages/settings/index.vue | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/web/pages/settings/index.vue b/web/pages/settings/index.vue
index 877565ec..2b4348ca 100644
--- a/web/pages/settings/index.vue
+++ b/web/pages/settings/index.vue
@@ -443,7 +443,7 @@
Documentation
From 1d291f313c19b31b5177ffe85328d5e039d4ecb3 Mon Sep 17 00:00:00 2001
From: Acho Arnold
Date: Sat, 16 May 2026 13:26:24 +0300
Subject: [PATCH 07/10] feat: add OpenTelemetry tracing to MongoDB client
Use otelmongo.NewMonitor() as the command monitor on the MongoDB
client to automatically trace all database operations.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
---
api/go.mod | 7 ++++---
api/go.sum | 14 ++++++++------
api/pkg/repositories/mongodb.go | 10 +++++++---
3 files changed, 19 insertions(+), 12 deletions(-)
diff --git a/api/go.mod b/api/go.mod
index de1338cd..1fe7dca1 100644
--- a/api/go.mod
+++ b/api/go.mod
@@ -47,6 +47,7 @@ require (
github.com/uptrace/uptrace-go v1.43.0
github.com/xuri/excelize/v2 v2.10.1
go.mongodb.org/mongo-driver/v2 v2.6.0
+ go.opentelemetry.io/contrib/instrumentation/go.mongodb.org/mongo-driver/v2/mongo/otelmongo v0.0.0-20260513205827-ba143fc95a5e
go.opentelemetry.io/otel v1.43.0
go.opentelemetry.io/otel/metric v1.43.0
go.opentelemetry.io/otel/sdk v1.43.0
@@ -187,12 +188,12 @@ require (
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.28.0 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
- golang.org/x/crypto v0.50.0 // indirect
+ golang.org/x/crypto v0.51.0 // indirect
golang.org/x/mod v0.35.0 // indirect
golang.org/x/net v0.53.0 // indirect
golang.org/x/oauth2 v0.36.0 // indirect
- golang.org/x/sys v0.43.0 // indirect
- golang.org/x/text v0.36.0 // indirect
+ golang.org/x/sys v0.44.0 // indirect
+ golang.org/x/text v0.37.0 // indirect
golang.org/x/time v0.15.0 // indirect
golang.org/x/tools v0.44.0 // indirect
google.golang.org/appengine v1.6.8 // indirect
diff --git a/api/go.sum b/api/go.sum
index be12cc63..fa1163a2 100644
--- a/api/go.sum
+++ b/api/go.sum
@@ -359,6 +359,8 @@ go.opentelemetry.io/contrib v1.43.0 h1:rv+pngknCr4qpZDxSpEvEoRioutgfbkk82x6MChJQ
go.opentelemetry.io/contrib v1.43.0/go.mod h1:JYdNU7Pl/2ckKMGp8/G7zeyhEbtRmy9Q8bcrtv75Znk=
go.opentelemetry.io/contrib/detectors/gcp v1.43.0 h1:62yY3dT7/ShwOxzA0RsKRgshBmfElKI4d/Myu2OxDFU=
go.opentelemetry.io/contrib/detectors/gcp v1.43.0/go.mod h1:RyaZMFY7yi1kAs45S6mbFGz8O8rqB0dTY14uzvG4LCs=
+go.opentelemetry.io/contrib/instrumentation/go.mongodb.org/mongo-driver/v2/mongo/otelmongo v0.0.0-20260513205827-ba143fc95a5e h1:OX282aWfZNOrSVUPF59HlRhyA+MDcyi4kI8WWXt6A8I=
+go.opentelemetry.io/contrib/instrumentation/go.mongodb.org/mongo-driver/v2/mongo/otelmongo v0.0.0-20260513205827-ba143fc95a5e/go.mod h1:lw7VQzmNsmkZBRQqOQiREGxO3GtzG/pOVEmKufablmA=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.68.0 h1:0Qx7VGBacMm9ZENQ7TnNObTYI4ShC+lHI16seduaxZo=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.68.0/go.mod h1:Sje3i3MjSPKTSPvVWCaL8ugBzJwik3u4smCjUeuupqg=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.68.0 h1:CqXxU8VOmDefoh0+ztfGaymYbhdB/tT3zs79QaZTNGY=
@@ -418,8 +420,8 @@ golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliY
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
-golang.org/x/crypto v0.50.0 h1:zO47/JPrL6vsNkINmLoo/PH1gcxpls50DNogFvB5ZGI=
-golang.org/x/crypto v0.50.0/go.mod h1:3muZ7vA7PBCE6xgPX7nkzzjiUq87kRItoJQM1Yo8S+Q=
+golang.org/x/crypto v0.51.0 h1:IBPXwPfKxY7cWQZ38ZCIRPI50YLeevDLlLnyC5wRGTI=
+golang.org/x/crypto v0.51.0/go.mod h1:8AdwkbraGNABw2kOX6YFPs3WM22XqI4EXEd8g+x7Oc8=
golang.org/x/image v0.25.0 h1:Y6uW6rH1y5y/LK1J8BPWZtr6yZ7hrsy6hFrXjgsc2fQ=
golang.org/x/image v0.25.0/go.mod h1:tCAmOEGthTtkalusGp1g3xa2gke8J6c2N565dTyl9Rs=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
@@ -464,8 +466,8 @@ golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
-golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI=
-golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
+golang.org/x/sys v0.44.0 h1:ildZl3J4uzeKP07r2F++Op7E9B29JRUy+a27EibtBTQ=
+golang.org/x/sys v0.44.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2/go.mod h1:TeRTkGYfJXctD9OcfyVLyj2J3IxLnKwHJR8f4D8a3YE=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
@@ -485,8 +487,8 @@ golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
-golang.org/x/text v0.36.0 h1:JfKh3XmcRPqZPKevfXVpI1wXPTqbkE5f7JA92a55Yxg=
-golang.org/x/text v0.36.0/go.mod h1:NIdBknypM8iqVmPiuco0Dh6P5Jcdk8lJL0CUebqK164=
+golang.org/x/text v0.37.0 h1:Cqjiwd9eSg8e0QAkyCaQTNHFIIzWtidPahFWR83rTrc=
+golang.org/x/text v0.37.0/go.mod h1:a5sjxXGs9hsn/AJVwuElvCAo9v8QYLzvavO5z2PiM38=
golang.org/x/time v0.15.0 h1:bbrp8t3bGUeFOx08pvsMYRTCVSMk89u4tKbNOZbp88U=
golang.org/x/time v0.15.0/go.mod h1:Y4YMaQmXwGQZoFaVFk4YpCt4FLQMYKZe9oeV/f4MSno=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
diff --git a/api/pkg/repositories/mongodb.go b/api/pkg/repositories/mongodb.go
index bbad0c07..bb884148 100644
--- a/api/pkg/repositories/mongodb.go
+++ b/api/pkg/repositories/mongodb.go
@@ -8,11 +8,11 @@ import (
"time"
"github.com/google/uuid"
+ "github.com/palantir/stacktrace"
"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/mongo"
"go.mongodb.org/mongo-driver/v2/mongo/options"
-
- "github.com/palantir/stacktrace"
+ "go.opentelemetry.io/contrib/instrumentation/go.mongodb.org/mongo-driver/v2/mongo/otelmongo"
)
const (
@@ -61,7 +61,11 @@ func NewMongoDB(uri string) (*mongo.Database, error) {
serverAPI := options.ServerAPI(options.ServerAPIVersion1)
registry := newMongoRegistry()
- opts := options.Client().ApplyURI(uri).SetServerAPIOptions(serverAPI).SetRegistry(registry)
+ opts := options.Client().
+ ApplyURI(uri).
+ SetServerAPIOptions(serverAPI).
+ SetRegistry(registry).
+ SetMonitor(otelmongo.NewMonitor())
client, err := mongo.Connect(opts)
if err != nil {
From cdac79a69f201ec9b5e8f766dc00522d322f5fa6 Mon Sep 17 00:00:00 2001
From: Acho Arnold
Date: Sat, 16 May 2026 13:28:30 +0300
Subject: [PATCH 08/10] fix: address PR review comments
- Remove MongoDB URI from Ping error message to prevent credential
exposure in logs and error aggregators
- Use separate contexts for Ping (10s) and index creation (30s) so
they don't share a timeout budget
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
---
api/pkg/repositories/mongodb.go | 13 ++++++++-----
1 file changed, 8 insertions(+), 5 deletions(-)
diff --git a/api/pkg/repositories/mongodb.go b/api/pkg/repositories/mongodb.go
index bb884148..f2f65ea9 100644
--- a/api/pkg/repositories/mongodb.go
+++ b/api/pkg/repositories/mongodb.go
@@ -56,8 +56,8 @@ func NewMongoDB(uri string) (*mongo.Database, error) {
return nil, stacktrace.Propagate(err, "cannot parse database name from MongoDB URI")
}
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
- defer cancel()
+ pingCtx, pingCancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer pingCancel()
serverAPI := options.ServerAPI(options.ServerAPIVersion1)
registry := newMongoRegistry()
@@ -72,13 +72,16 @@ func NewMongoDB(uri string) (*mongo.Database, error) {
return nil, stacktrace.Propagate(err, "cannot connect to MongoDB Atlas")
}
- if err = client.Ping(ctx, nil); err != nil {
- return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot ping MongoDB with URI [%s]", uri))
+ if err = client.Ping(pingCtx, nil); err != nil {
+ return nil, stacktrace.Propagate(err, "cannot ping MongoDB Atlas")
}
db := client.Database(dbName)
- if err = createMongoIndexes(ctx, db); err != nil {
+ indexCtx, indexCancel := context.WithTimeout(context.Background(), 30*time.Second)
+ defer indexCancel()
+
+ if err = createMongoIndexes(indexCtx, db); err != nil {
return nil, stacktrace.Propagate(err, "cannot create MongoDB indexes")
}
From e0f4d241a88c8d1fe97bfb30f41cee672c518049 Mon Sep 17 00:00:00 2001
From: Acho Arnold
Date: Sat, 16 May 2026 13:31:31 +0300
Subject: [PATCH 09/10] fix: update send schedule docs link to correct URL
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
---
web/pages/settings/index.vue | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/web/pages/settings/index.vue b/web/pages/settings/index.vue
index 2b4348ca..96aa83f3 100644
--- a/web/pages/settings/index.vue
+++ b/web/pages/settings/index.vue
@@ -385,7 +385,7 @@
delivered when the schedule opens according to your
configured send rate.
From 6e45a28d161b37d3629ec916a1e0b93f17eac997 Mon Sep 17 00:00:00 2001
From: Acho Arnold
Date: Sat, 16 May 2026 13:37:57 +0300
Subject: [PATCH 10/10] fix: update CI workflow to check MongoDB instead of
sqld
Replace the sqld health check with a MongoDB mongosh ping check
since the test infrastructure now uses mongo:7 instead of sqld.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
---
.github/workflows/api.yml | 12 ++++++------
1 file changed, 6 insertions(+), 6 deletions(-)
diff --git a/.github/workflows/api.yml b/.github/workflows/api.yml
index e8044d5c..849f5ec8 100644
--- a/.github/workflows/api.yml
+++ b/.github/workflows/api.yml
@@ -37,18 +37,18 @@ jobs:
- name: Wait for services to be healthy
working-directory: ./tests
run: |
- echo "Waiting for sqld to be healthy..."
+ echo "Waiting for MongoDB to be healthy..."
for i in $(seq 1 20); do
- if curl -sf http://localhost:8090/health >/dev/null 2>&1; then
- echo "sqld is healthy!"
+ if docker compose exec mongodb mongosh --eval "db.runCommand('ping').ok" --quiet >/dev/null 2>&1; then
+ echo "MongoDB is healthy!"
break
fi
if [ $i -eq 20 ]; then
- echo "sqld failed to become healthy"
- docker compose logs sqld
+ echo "MongoDB failed to become healthy"
+ docker compose logs mongodb
exit 1
fi
- echo "sqld attempt $i/20 - waiting 3s..."
+ echo "MongoDB attempt $i/20 - waiting 3s..."
sleep 3
done