From f944650b3a9e79ea5f6c68bab282a15f25926e2a Mon Sep 17 00:00:00 2001 From: Acho Arnold Date: Sat, 16 May 2026 13:07:44 +0300 Subject: [PATCH 01/10] feat: replace libSQL/Turso with MongoDB Atlas for heartbeat storage - Add MongoDB Go driver v2 repository implementations for HeartbeatRepository and HeartbeatMonitorRepository interfaces - Create mongodb.go connection helper with Atlas support and index creation - Update DI container to wire MongoDB as the hedging secondary backend - Replace 'turso' case with 'mongodb' case for standalone MongoDB usage - Update integration test docker-compose to use mongo:7 instead of sqld - Update .env.test with MongoDB connection string HEARTBEAT_DB_BACKEND options: - 'hedging': PostgreSQL primary, MongoDB secondary (fail-open writes) - 'mongodb': MongoDB only - default: PostgreSQL only (GORM) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- api/go.mod | 5 + api/go.sum | 10 + api/pkg/di/container.go | 39 ++- .../mongo_heartbeat_monitor_repository.go | 230 ++++++++++++++++++ .../mongo_heartbeat_repository.go | 181 ++++++++++++++ api/pkg/repositories/mongodb.go | 68 ++++++ tests/.env.test | 2 +- tests/docker-compose.yml | 19 +- web/pages/settings/index.vue | 16 +- 9 files changed, 550 insertions(+), 20 deletions(-) create mode 100644 api/pkg/repositories/mongo_heartbeat_monitor_repository.go create mode 100644 api/pkg/repositories/mongo_heartbeat_repository.go create mode 100644 api/pkg/repositories/mongodb.go diff --git a/api/go.mod b/api/go.mod index d77c0bf6..24b70d57 100644 --- a/api/go.mod +++ b/api/go.mod @@ -47,6 +47,7 @@ require ( github.com/tursodatabase/libsql-client-go v0.0.0-20260514053736-a9a8fadfe885 github.com/uptrace/uptrace-go v1.43.0 github.com/xuri/excelize/v2 v2.10.1 + go.mongodb.org/mongo-driver/v2 v2.6.0 go.opentelemetry.io/otel v1.43.0 go.opentelemetry.io/otel/metric v1.43.0 go.opentelemetry.io/otel/sdk v1.43.0 @@ -164,8 +165,12 @@ require ( github.com/valyala/fasthttp v1.71.0 // indirect github.com/vanng822/css v1.0.1 // indirect github.com/vanng822/go-premailer v1.33.0 // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/scram v1.2.0 // indirect + github.com/xdg-go/stringprep v1.0.4 // indirect github.com/xuri/efp v0.0.1 // indirect github.com/xuri/nfp v0.0.2-0.20250530014748-2ddeb826f9a9 // indirect + github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.opentelemetry.io/contrib v1.43.0 // indirect go.opentelemetry.io/contrib/detectors/gcp v1.43.0 // indirect diff --git a/api/go.sum b/api/go.sum index 0710b094..d5fa9e1d 100644 --- a/api/go.sum +++ b/api/go.sum @@ -336,6 +336,12 @@ github.com/vanng822/css v1.0.1 h1:10yiXc4e8NI8ldU6mSrWmSWMuyWgPr9DZ63RSlsgDw8= github.com/vanng822/css v1.0.1/go.mod h1:tcnB1voG49QhCrwq1W0w5hhGasvOg+VQp9i9H1rCM1w= github.com/vanng822/go-premailer v1.33.0 h1:nglIpKn/7e3kIAwYByiH5xpauFur7RwAucqyZ59hcic= github.com/vanng822/go-premailer v1.33.0/go.mod h1:LGYI7ym6FQ7KcHN16LiQRF+tlan7qwhP1KEhpTINFpo= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.2.0 h1:bYKF2AEwG5rqd1BumT4gAnvwU/M9nBp2pTSxeZw7Wvs= +github.com/xdg-go/scram v1.2.0/go.mod h1:3dlrS0iBaWKYVt2ZfA4cj48umJZ+cAEbR6/SjLA88I8= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= github.com/xuri/efp v0.0.1 h1:fws5Rv3myXyYni8uwj2qKjVaRP30PdjeYe2Y6FDsCL8= github.com/xuri/efp v0.0.1/go.mod h1:ybY/Jr0T0GTCnYjKqmdwxyxn2BQf2RcQIIvex5QldPI= github.com/xuri/excelize/v2 v2.10.1 h1:V62UlqopMqha3kOpnlHy2CcRVw1V8E63jFoWUmMzxN0= @@ -344,11 +350,15 @@ github.com/xuri/nfp v0.0.2-0.20250530014748-2ddeb826f9a9 h1:+C0TIdyyYmzadGaL/HBL github.com/xuri/nfp v0.0.2-0.20250530014748-2ddeb826f9a9/go.mod h1:WwHg+CVyzlv/TX9xqBFXEZAuxOPxn2k1GNHwG41IIUQ= github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= +github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM= +github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/yuin/goldmark v1.8.2 h1:kEGpgqJXdgbkhcOgBxkC0X0PmoPG1ZyoZ117rDVp4zE= github.com/yuin/goldmark v1.8.2/go.mod h1:ip/1k0VRfGynBgxOz0yCqHrbZXhcjxyuS66Brc7iBKg= github.com/zeebo/xxh3 v1.1.0 h1:s7DLGDK45Dyfg7++yxI0khrfwq9661w9EN78eP/UZVs= github.com/zeebo/xxh3 v1.1.0/go.mod h1:IisAie1LELR4xhVinxWS5+zf1lA4p0MW4T+w+W07F5s= +go.mongodb.org/mongo-driver/v2 v2.6.0 h1:b9sJOYrkmt4l8bY43ZenFBcPlhYIjaOfYHLtbB/5qi8= +go.mongodb.org/mongo-driver/v2 v2.6.0/go.mod h1:yOI9kBsufol30iFsl1slpdq1I0eHPzybRWdyYUs8K/0= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= go.opentelemetry.io/contrib v1.43.0 h1:rv+pngknCr4qpZDxSpEvEoRioutgfbkk82x6MChJQ3U= diff --git a/api/pkg/di/container.go b/api/pkg/di/container.go index ce6a82e2..94c2722e 100644 --- a/api/pkg/di/container.go +++ b/api/pkg/di/container.go @@ -74,6 +74,7 @@ import ( "github.com/NdoleStudio/httpsms/pkg/handlers" "github.com/NdoleStudio/httpsms/pkg/telemetry" "github.com/NdoleStudio/httpsms/pkg/validators" + mongoDriver "go.mongodb.org/mongo-driver/v2/mongo" "gorm.io/driver/postgres" gormLogger "gorm.io/gorm/logger" ) @@ -84,6 +85,7 @@ type Container struct { db *gorm.DB dedicatedDB *gorm.DB tursoDB *sql.DB + mongoClient *mongoDriver.Client version string app *fiber.App eventDispatcher *services.EventDispatcher @@ -310,6 +312,23 @@ func (container *Container) TursoDB() *sql.DB { return container.tursoDB } +// MongoDB creates a *mongo.Client connection to MongoDB Atlas +func (container *Container) MongoDB() *mongoDriver.Client { + if container.mongoClient != nil { + return container.mongoClient + } + + container.logger.Debug("creating MongoDB *mongo.Client connection") + + client, err := repositories.NewMongoDB(os.Getenv("MONGODB_URI")) + if err != nil { + container.logger.Fatal(err) + } + + container.mongoClient = client + return container.mongoClient +} + // HedgingFailureCounter creates an OTel counter for hedging secondary write failures func (container *Container) HedgingFailureCounter() otelMetric.Int64Counter { meter := otel.GetMeterProvider().Meter( @@ -922,12 +941,12 @@ func (container *Container) MessageThreadRepository() (repository repositories.M // HeartbeatMonitorRepository creates a new instance of repositories.HeartbeatMonitorRepository func (container *Container) HeartbeatMonitorRepository() (repository repositories.HeartbeatMonitorRepository) { switch os.Getenv("HEARTBEAT_DB_BACKEND") { - case "turso": - container.logger.Debug("creating libSQL repositories.HeartbeatMonitorRepository") - return repositories.NewLibsqlHeartbeatMonitorRepository( + case "mongodb": + container.logger.Debug("creating MongoDB repositories.HeartbeatMonitorRepository") + return repositories.NewMongoHeartbeatMonitorRepository( container.Logger(), container.Tracer(), - container.TursoDB(), + container.MongoDB(), ) case "hedging": container.logger.Debug("creating hedging repositories.HeartbeatMonitorRepository") @@ -935,7 +954,7 @@ func (container *Container) HeartbeatMonitorRepository() (repository repositorie container.Logger(), container.Tracer(), repositories.NewGormHeartbeatMonitorRepository(container.Logger(), container.Tracer(), container.DedicatedDB()), - repositories.NewLibsqlHeartbeatMonitorRepository(container.Logger(), container.Tracer(), container.TursoDB()), + repositories.NewMongoHeartbeatMonitorRepository(container.Logger(), container.Tracer(), container.MongoDB()), container.HedgingFailureCounter(), ) default: @@ -1760,12 +1779,12 @@ func (container *Container) RegisterSwaggerRoutes() { // HeartbeatRepository registers a new instance of repositories.HeartbeatRepository func (container *Container) HeartbeatRepository() repositories.HeartbeatRepository { switch os.Getenv("HEARTBEAT_DB_BACKEND") { - case "turso": - container.logger.Debug("creating libSQL repositories.HeartbeatRepository") - return repositories.NewLibsqlHeartbeatRepository( + case "mongodb": + container.logger.Debug("creating MongoDB repositories.HeartbeatRepository") + return repositories.NewMongoHeartbeatRepository( container.Logger(), container.Tracer(), - container.TursoDB(), + container.MongoDB(), ) case "hedging": container.logger.Debug("creating hedging repositories.HeartbeatRepository") @@ -1773,7 +1792,7 @@ func (container *Container) HeartbeatRepository() repositories.HeartbeatReposito container.Logger(), container.Tracer(), repositories.NewGormHeartbeatRepository(container.Logger(), container.Tracer(), container.DedicatedDB()), - repositories.NewLibsqlHeartbeatRepository(container.Logger(), container.Tracer(), container.TursoDB()), + repositories.NewMongoHeartbeatRepository(container.Logger(), container.Tracer(), container.MongoDB()), container.HedgingFailureCounter(), ) default: diff --git a/api/pkg/repositories/mongo_heartbeat_monitor_repository.go b/api/pkg/repositories/mongo_heartbeat_monitor_repository.go new file mode 100644 index 00000000..1f5f014f --- /dev/null +++ b/api/pkg/repositories/mongo_heartbeat_monitor_repository.go @@ -0,0 +1,230 @@ +package repositories + +import ( + "context" + "fmt" + "time" + + "github.com/google/uuid" + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/mongo" + + "github.com/NdoleStudio/httpsms/pkg/entities" + "github.com/NdoleStudio/httpsms/pkg/telemetry" + "github.com/palantir/stacktrace" +) + +// mongoHeartbeatMonitorRepository is responsible for persisting entities.HeartbeatMonitor in MongoDB +type mongoHeartbeatMonitorRepository struct { + logger telemetry.Logger + tracer telemetry.Tracer + collection *mongo.Collection +} + +// NewMongoHeartbeatMonitorRepository creates the MongoDB version of the HeartbeatMonitorRepository +func NewMongoHeartbeatMonitorRepository( + logger telemetry.Logger, + tracer telemetry.Tracer, + client *mongo.Client, +) HeartbeatMonitorRepository { + return &mongoHeartbeatMonitorRepository{ + logger: logger.WithService(fmt.Sprintf("%T", &mongoHeartbeatMonitorRepository{})), + tracer: tracer, + collection: client.Database(mongoDBName).Collection(collectionHeartbeatMonitors), + } +} + +type heartbeatMonitorDocument struct { + ID string `bson:"_id"` + PhoneID string `bson:"phone_id"` + UserID string `bson:"user_id"` + QueueID string `bson:"queue_id"` + Owner string `bson:"owner"` + PhoneOnline bool `bson:"phone_online"` + CreatedAt time.Time `bson:"created_at"` + UpdatedAt time.Time `bson:"updated_at"` +} + +func (repository *mongoHeartbeatMonitorRepository) Store(ctx context.Context, monitor *entities.HeartbeatMonitor) error { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) + defer cancel() + + doc := heartbeatMonitorDocument{ + ID: monitor.ID.String(), + PhoneID: monitor.PhoneID.String(), + UserID: string(monitor.UserID), + QueueID: monitor.QueueID, + Owner: monitor.Owner, + PhoneOnline: monitor.PhoneOnline, + CreatedAt: monitor.CreatedAt.UTC(), + UpdatedAt: monitor.UpdatedAt.UTC(), + } + + _, err := repository.collection.InsertOne(ctx, doc) + if err != nil { + msg := fmt.Sprintf("cannot save heartbeat monitor with ID [%s]", monitor.ID) + return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + + return nil +} + +func (repository *mongoHeartbeatMonitorRepository) Load(ctx context.Context, userID entities.UserID, phoneNumber string) (*entities.HeartbeatMonitor, error) { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) + defer cancel() + + filter := bson.D{ + {"user_id", string(userID)}, + {"owner", phoneNumber}, + } + + var doc heartbeatMonitorDocument + err := repository.collection.FindOne(ctx, filter).Decode(&doc) + if err == mongo.ErrNoDocuments { + msg := fmt.Sprintf("heartbeat monitor with userID [%s] and owner [%s] does not exist", userID, phoneNumber) + return nil, repository.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, ErrCodeNotFound, msg)) + } + if err != nil { + msg := fmt.Sprintf("cannot load heartbeat monitor with userID [%s] and owner [%s]", userID, phoneNumber) + return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + + monitor, err := docToHeartbeatMonitor(doc) + if err != nil { + return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, "cannot convert heartbeat monitor document")) + } + + return monitor, nil +} + +func (repository *mongoHeartbeatMonitorRepository) Exists(ctx context.Context, userID entities.UserID, monitorID uuid.UUID) (bool, error) { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) + defer cancel() + + filter := bson.D{ + {"user_id", string(userID)}, + {"_id", monitorID.String()}, + } + + count, err := repository.collection.CountDocuments(ctx, filter) + if err != nil { + msg := fmt.Sprintf("cannot check if heartbeat monitor exists with userID [%s] and monitor ID [%s]", userID, monitorID) + return false, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + + return count > 0, nil +} + +func (repository *mongoHeartbeatMonitorRepository) UpdateQueueID(ctx context.Context, monitorID uuid.UUID, queueID string) error { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) + defer cancel() + + filter := bson.D{{"_id", monitorID.String()}} + update := bson.D{{"$set", bson.D{ + {"queue_id", queueID}, + {"updated_at", time.Now().UTC()}, + }}} + + _, err := repository.collection.UpdateOne(ctx, filter, update) + if err != nil { + msg := fmt.Sprintf("cannot update heartbeat monitor ID [%s]", monitorID) + return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + + return nil +} + +func (repository *mongoHeartbeatMonitorRepository) Delete(ctx context.Context, userID entities.UserID, phoneNumber string) error { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) + defer cancel() + + filter := bson.D{ + {"user_id", string(userID)}, + {"owner", phoneNumber}, + } + + _, err := repository.collection.DeleteMany(ctx, filter) + if err != nil { + msg := fmt.Sprintf("cannot delete heartbeat monitor with owner [%s] and userID [%s]", phoneNumber, userID) + return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + + return nil +} + +func (repository *mongoHeartbeatMonitorRepository) UpdatePhoneOnline(ctx context.Context, userID entities.UserID, monitorID uuid.UUID, online bool) error { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) + defer cancel() + + filter := bson.D{ + {"_id", monitorID.String()}, + {"user_id", string(userID)}, + } + update := bson.D{{"$set", bson.D{ + {"phone_online", online}, + {"updated_at", time.Now().UTC()}, + }}} + + _, err := repository.collection.UpdateOne(ctx, filter, update) + if err != nil { + msg := fmt.Sprintf("cannot update heartbeat monitor ID [%s] for user [%s]", monitorID, userID) + return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + + return nil +} + +func (repository *mongoHeartbeatMonitorRepository) DeleteAllForUser(ctx context.Context, userID entities.UserID) error { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) + defer cancel() + + _, err := repository.collection.DeleteMany(ctx, bson.D{{"user_id", string(userID)}}) + if err != nil { + msg := fmt.Sprintf("cannot delete all [%T] for user with ID [%s]", &entities.HeartbeatMonitor{}, userID) + return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + + return nil +} + +func docToHeartbeatMonitor(doc heartbeatMonitorDocument) (*entities.HeartbeatMonitor, error) { + id, err := uuid.Parse(doc.ID) + if err != nil { + return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot parse heartbeat monitor ID [%s]", doc.ID)) + } + phoneID, err := uuid.Parse(doc.PhoneID) + if err != nil { + return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot parse heartbeat monitor phone ID [%s]", doc.PhoneID)) + } + return &entities.HeartbeatMonitor{ + ID: id, + PhoneID: phoneID, + UserID: entities.UserID(doc.UserID), + QueueID: doc.QueueID, + Owner: doc.Owner, + PhoneOnline: doc.PhoneOnline, + CreatedAt: doc.CreatedAt, + UpdatedAt: doc.UpdatedAt, + }, nil +} diff --git a/api/pkg/repositories/mongo_heartbeat_repository.go b/api/pkg/repositories/mongo_heartbeat_repository.go new file mode 100644 index 00000000..13802e06 --- /dev/null +++ b/api/pkg/repositories/mongo_heartbeat_repository.go @@ -0,0 +1,181 @@ +package repositories + +import ( + "context" + "fmt" + "time" + + "github.com/google/uuid" + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/mongo" + "go.mongodb.org/mongo-driver/v2/mongo/options" + + "github.com/NdoleStudio/httpsms/pkg/entities" + "github.com/NdoleStudio/httpsms/pkg/telemetry" + "github.com/palantir/stacktrace" +) + +// mongoHeartbeatRepository is responsible for persisting entities.Heartbeat in MongoDB +type mongoHeartbeatRepository struct { + logger telemetry.Logger + tracer telemetry.Tracer + collection *mongo.Collection +} + +// NewMongoHeartbeatRepository creates the MongoDB version of the HeartbeatRepository +func NewMongoHeartbeatRepository( + logger telemetry.Logger, + tracer telemetry.Tracer, + client *mongo.Client, +) HeartbeatRepository { + return &mongoHeartbeatRepository{ + logger: logger.WithService(fmt.Sprintf("%T", &mongoHeartbeatRepository{})), + tracer: tracer, + collection: client.Database(mongoDBName).Collection(collectionHeartbeats), + } +} + +type heartbeatDocument struct { + ID string `bson:"_id"` + Owner string `bson:"owner"` + Version string `bson:"version"` + Charging bool `bson:"charging"` + UserID string `bson:"user_id"` + Timestamp time.Time `bson:"timestamp"` +} + +func (repository *mongoHeartbeatRepository) Store(ctx context.Context, heartbeat *entities.Heartbeat) error { + ctx, span, _ := repository.tracer.StartWithLogger(ctx, repository.logger) + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) + defer cancel() + + doc := heartbeatDocument{ + ID: heartbeat.ID.String(), + Owner: heartbeat.Owner, + Version: heartbeat.Version, + Charging: heartbeat.Charging, + UserID: string(heartbeat.UserID), + Timestamp: heartbeat.Timestamp.UTC(), + } + + _, err := repository.collection.InsertOne(ctx, doc) + if err != nil { + msg := fmt.Sprintf("cannot save heartbeat with ID [%s]", heartbeat.ID) + return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + + return nil +} + +func (repository *mongoHeartbeatRepository) Index(ctx context.Context, userID entities.UserID, owner string, params IndexParams) (*[]entities.Heartbeat, error) { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) + defer cancel() + + filter := bson.D{ + {"user_id", string(userID)}, + {"owner", owner}, + } + + if len(params.Query) > 0 { + filter = append(filter, bson.E{"version", bson.D{{"$regex", params.Query}, {"$options", "i"}}}) + } + + opts := options.Find(). + SetSort(bson.D{{"timestamp", -1}}). + SetSkip(int64(params.Skip)). + SetLimit(int64(params.Limit)) + + cursor, err := repository.collection.Find(ctx, filter, opts) + if err != nil { + msg := fmt.Sprintf("cannot fetch heartbeats with owner [%s] and params [%+#v]", owner, params) + return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + defer cursor.Close(ctx) + + var docs []heartbeatDocument + if err = cursor.All(ctx, &docs); err != nil { + msg := fmt.Sprintf("cannot decode heartbeats for owner [%s]", owner) + return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + + heartbeats := make([]entities.Heartbeat, 0, len(docs)) + for _, doc := range docs { + hb, convertErr := docToHeartbeat(doc) + if convertErr != nil { + msg := fmt.Sprintf("cannot convert heartbeat document for owner [%s]", owner) + return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(convertErr, msg)) + } + heartbeats = append(heartbeats, *hb) + } + + return &heartbeats, nil +} + +func (repository *mongoHeartbeatRepository) Last(ctx context.Context, userID entities.UserID, owner string) (*entities.Heartbeat, error) { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) + defer cancel() + + filter := bson.D{ + {"user_id", string(userID)}, + {"owner", owner}, + } + + opts := options.FindOne().SetSort(bson.D{{"timestamp", -1}}) + + var doc heartbeatDocument + err := repository.collection.FindOne(ctx, filter, opts).Decode(&doc) + if err == mongo.ErrNoDocuments { + msg := fmt.Sprintf("heartbeat with userID [%s] and owner [%s] does not exist", userID, owner) + return nil, repository.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, ErrCodeNotFound, msg)) + } + if err != nil { + msg := fmt.Sprintf("cannot load heartbeat with userID [%s] and owner [%s]", userID, owner) + return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + + heartbeat, err := docToHeartbeat(doc) + if err != nil { + return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, "cannot convert heartbeat document")) + } + + return heartbeat, nil +} + +func (repository *mongoHeartbeatRepository) DeleteAllForUser(ctx context.Context, userID entities.UserID) error { + ctx, span := repository.tracer.Start(ctx) + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) + defer cancel() + + _, err := repository.collection.DeleteMany(ctx, bson.D{{"user_id", string(userID)}}) + if err != nil { + msg := fmt.Sprintf("cannot delete all [%T] for user with ID [%s]", &entities.Heartbeat{}, userID) + return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) + } + + return nil +} + +func docToHeartbeat(doc heartbeatDocument) (*entities.Heartbeat, error) { + id, err := uuid.Parse(doc.ID) + if err != nil { + return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot parse heartbeat ID [%s]", doc.ID)) + } + return &entities.Heartbeat{ + ID: id, + Owner: doc.Owner, + Version: doc.Version, + Charging: doc.Charging, + UserID: entities.UserID(doc.UserID), + Timestamp: doc.Timestamp, + }, nil +} diff --git a/api/pkg/repositories/mongodb.go b/api/pkg/repositories/mongodb.go new file mode 100644 index 00000000..80328bed --- /dev/null +++ b/api/pkg/repositories/mongodb.go @@ -0,0 +1,68 @@ +package repositories + +import ( + "context" + "fmt" + "time" + + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/mongo" + "go.mongodb.org/mongo-driver/v2/mongo/options" + + "github.com/palantir/stacktrace" +) + +const ( + mongoDBName = "httpsms" + collectionHeartbeats = "heartbeats" + collectionHeartbeatMonitors = "heartbeat_monitors" +) + +// NewMongoDB creates a new *mongo.Client connection to MongoDB Atlas and ensures indexes +func NewMongoDB(uri string) (*mongo.Client, error) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + serverAPI := options.ServerAPI(options.ServerAPIVersion1) + opts := options.Client().ApplyURI(uri).SetServerAPIOptions(serverAPI) + + client, err := mongo.Connect(opts) + if err != nil { + return nil, stacktrace.Propagate(err, "cannot connect to MongoDB Atlas") + } + + if err = client.Ping(ctx, nil); err != nil { + return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot ping MongoDB with URI [%s]", uri)) + } + + if err = createMongoIndexes(ctx, client); err != nil { + return nil, stacktrace.Propagate(err, "cannot create MongoDB indexes") + } + + return client, nil +} + +func createMongoIndexes(ctx context.Context, client *mongo.Client) error { + db := client.Database(mongoDBName) + + // Heartbeats indexes + heartbeatsCol := db.Collection(collectionHeartbeats) + _, err := heartbeatsCol.Indexes().CreateMany(ctx, []mongo.IndexModel{ + {Keys: bson.D{{"owner", 1}, {"timestamp", -1}}}, + {Keys: bson.D{{"user_id", 1}}}, + }) + if err != nil { + return stacktrace.Propagate(err, "cannot create indexes on heartbeats collection") + } + + // Heartbeat monitors indexes + monitorsCol := db.Collection(collectionHeartbeatMonitors) + _, err = monitorsCol.Indexes().CreateMany(ctx, []mongo.IndexModel{ + {Keys: bson.D{{"user_id", 1}, {"owner", 1}}}, + }) + if err != nil { + return stacktrace.Propagate(err, "cannot create indexes on heartbeat_monitors collection") + } + + return nil +} diff --git a/tests/.env.test b/tests/.env.test index 909902ae..3cec6a66 100644 --- a/tests/.env.test +++ b/tests/.env.test @@ -29,4 +29,4 @@ GCS_BUCKET_NAME= UPTRACE_DSN= CLOUDFLARE_TURNSTILE_SECRET_KEY= HEARTBEAT_DB_BACKEND=hedging -TURSO_DATABASE_DSN=http://sqld:8080 +MONGODB_URI=mongodb://httpsms:testpassword@mongodb:27017/?authSource=admin diff --git a/tests/docker-compose.yml b/tests/docker-compose.yml index 1e111b78..515b6298 100644 --- a/tests/docker-compose.yml +++ b/tests/docker-compose.yml @@ -25,10 +25,19 @@ services: timeout: 5s retries: 10 - sqld: - image: ghcr.io/tursodatabase/libsql-server:latest + mongodb: + image: mongo:7 ports: - - "8090:8080" + - "27017:27017" + environment: + MONGO_INITDB_ROOT_USERNAME: httpsms + MONGO_INITDB_ROOT_PASSWORD: testpassword + healthcheck: + test: ["CMD", "mongosh", "--eval", "db.adminCommand('ping')"] + interval: 5s + timeout: 5s + retries: 10 + start_period: 5s wiremock: image: wiremock/wiremock:3x @@ -58,8 +67,8 @@ services: condition: service_healthy wiremock: condition: service_healthy - sqld: - condition: service_started + mongodb: + condition: service_healthy env_file: - .env.test environment: diff --git a/web/pages/settings/index.vue b/web/pages/settings/index.vue index ebe95562..877565ec 100644 --- a/web/pages/settings/index.vue +++ b/web/pages/settings/index.vue @@ -435,10 +435,18 @@ - - {{ mdiCalendarClock }} - Create Send Schedule - +
+ + {{ mdiCalendarClock }} + Create Send Schedule + + Documentation +
Email Notifications From 440faa5b00ccbd21c2b416f576e9b828da0f76ee Mon Sep 17 00:00:00 2001 From: Acho Arnold Date: Sat, 16 May 2026 13:10:28 +0300 Subject: [PATCH 02/10] chore: remove unused libSQL/Turso files and dependencies - Delete libsql.go, libsql_heartbeat_repository.go, libsql_heartbeat_monitor_repository.go - Remove TursoDB() method and tursoDB field from DI container - Remove unused database/sql import from container - Run go mod tidy to remove libsql-client-go dependency Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- api/go.mod | 4 - api/go.sum | 8 - api/pkg/di/container.go | 19 -- api/pkg/repositories/libsql.go | 67 ------ .../libsql_heartbeat_monitor_repository.go | 199 ------------------ .../libsql_heartbeat_repository.go | 186 ---------------- 6 files changed, 483 deletions(-) delete mode 100644 api/pkg/repositories/libsql.go delete mode 100644 api/pkg/repositories/libsql_heartbeat_monitor_repository.go delete mode 100644 api/pkg/repositories/libsql_heartbeat_repository.go diff --git a/api/go.mod b/api/go.mod index 24b70d57..de1338cd 100644 --- a/api/go.mod +++ b/api/go.mod @@ -44,7 +44,6 @@ require ( github.com/stretchr/testify v1.11.1 github.com/swaggo/swag v1.16.6 github.com/thedevsaddam/govalidator v1.9.10 - github.com/tursodatabase/libsql-client-go v0.0.0-20260514053736-a9a8fadfe885 github.com/uptrace/uptrace-go v1.43.0 github.com/xuri/excelize/v2 v2.10.1 go.mongodb.org/mongo-driver/v2 v2.6.0 @@ -95,13 +94,11 @@ require ( github.com/PuerkitoBio/goquery v1.12.0 // indirect github.com/andybalholm/brotli v1.2.1 // indirect github.com/andybalholm/cascadia v1.3.3 // indirect - github.com/antlr4-go/antlr/v4 v4.13.0 // indirect github.com/cenkalti/backoff/v5 v5.0.3 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/clipperhouse/displaywidth v0.11.0 // indirect github.com/clipperhouse/uax29/v2 v2.7.0 // indirect github.com/cncf/xds/go v0.0.0-20260202195803-dba9d589def2 // indirect - github.com/coder/websocket v1.8.12 // indirect github.com/envoyproxy/go-control-plane/envoy v1.37.0 // indirect github.com/envoyproxy/protoc-gen-validate v1.3.3 // indirect github.com/fatih/color v1.19.0 // indirect @@ -191,7 +188,6 @@ require ( go.uber.org/zap v1.28.0 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect golang.org/x/crypto v0.50.0 // indirect - golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 // indirect golang.org/x/mod v0.35.0 // indirect golang.org/x/net v0.53.0 // indirect golang.org/x/oauth2 v0.36.0 // indirect diff --git a/api/go.sum b/api/go.sum index d5fa9e1d..be12cc63 100644 --- a/api/go.sum +++ b/api/go.sum @@ -66,8 +66,6 @@ github.com/andybalholm/brotli v1.2.1 h1:R+f5xP285VArJDRgowrfb9DqL18yVK0gKAW/F+eT github.com/andybalholm/brotli v1.2.1/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY= github.com/andybalholm/cascadia v1.3.3 h1:AG2YHrzJIm4BZ19iwJ/DAua6Btl3IwJX+VI4kktS1LM= github.com/andybalholm/cascadia v1.3.3/go.mod h1:xNd9bqTn98Ln4DwST8/nG+H0yuB8Hmgu1YHNnWw0GeA= -github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8TVTI= -github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g= github.com/avast/retry-go/v5 v5.0.0 h1:kf1Qc2UsTZ4qq8elDymqfbISvkyMuhgRxuJqX2NHP7k= github.com/avast/retry-go/v5 v5.0.0/go.mod h1://d+usmKWio1agtZfS1H/ltTqwtIfBnRq9zEwjc3eH8= github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= @@ -90,8 +88,6 @@ github.com/cncf/xds/go v0.0.0-20260202195803-dba9d589def2 h1:aBangftG7EVZoUb69Os github.com/cncf/xds/go v0.0.0-20260202195803-dba9d589def2/go.mod h1:qwXFYgsP6T7XnJtbKlf1HP8AjxZZyzxMmc+Lq5GjlU4= github.com/cockroachdb/cockroach-go/v2 v2.4.3 h1:LJO3K3jC5WXvMePRQSJE1NsIGoFGcEx1LW83W6RAlhw= github.com/cockroachdb/cockroach-go/v2 v2.4.3/go.mod h1:9U179XbCx4qFWtNhc7BiWLPfuyMVQ7qdAhfrwLz1vH0= -github.com/coder/websocket v1.8.12 h1:5bUXkEPPIbewrnkU8LTCLVaxi4N4J8ahufH2vlo4NAo= -github.com/coder/websocket v1.8.12/go.mod h1:LNVeNrXQZfe5qhS9ALED3uA+l5pPqvwXg3CKoDBB2gs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= @@ -324,8 +320,6 @@ github.com/thedevsaddam/govalidator v1.9.10 h1:m3dLRbSZ5Hts3VUWYe+vxLMG+FdyQuWOj github.com/thedevsaddam/govalidator v1.9.10/go.mod h1:Ilx8u7cg5g3LXbSS943cx5kczyNuUn7LH/cK5MYuE90= github.com/tiendc/go-deepcopy v1.7.2 h1:Ut2yYR7W9tWjTQitganoIue4UGxZwCcJy3orjrrIj44= github.com/tiendc/go-deepcopy v1.7.2/go.mod h1:4bKjNC2r7boYOkD2IOuZpYjmlDdzjbpTRyCx+goBCJQ= -github.com/tursodatabase/libsql-client-go v0.0.0-20260514053736-a9a8fadfe885 h1:YssVXwM/9nUAjGNmUWdgvb05JVcsaBrDn5yr+MaJTn0= -github.com/tursodatabase/libsql-client-go v0.0.0-20260514053736-a9a8fadfe885/go.mod h1:08inkKyguB6CGGssc/JzhmQWwBgFQBgjlYFjxjRh7nU= github.com/uptrace/uptrace-go v1.43.0 h1:5QuCdyFJdWUEXx6Fr6sYfezdgO6n6lnkOvUTLlyQO7U= github.com/uptrace/uptrace-go v1.43.0/go.mod h1:ehDTIdtBSolg4Z0CCvg1C8yR6VX1YFDqBcg2KmsXWn0= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= @@ -426,8 +420,6 @@ golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/crypto v0.50.0 h1:zO47/JPrL6vsNkINmLoo/PH1gcxpls50DNogFvB5ZGI= golang.org/x/crypto v0.50.0/go.mod h1:3muZ7vA7PBCE6xgPX7nkzzjiUq87kRItoJQM1Yo8S+Q= -golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 h1:aAcj0Da7eBAtrTp03QXWvm88pSyOt+UgdZw2BFZ+lEw= -golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8/go.mod h1:CQ1k9gNrJ50XIzaKCRR2hssIjF07kZFEiieALBM/ARQ= golang.org/x/image v0.25.0 h1:Y6uW6rH1y5y/LK1J8BPWZtr6yZ7hrsy6hFrXjgsc2fQ= golang.org/x/image v0.25.0/go.mod h1:tCAmOEGthTtkalusGp1g3xa2gke8J6c2N565dTyl9Rs= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= diff --git a/api/pkg/di/container.go b/api/pkg/di/container.go index 94c2722e..c8b005d9 100644 --- a/api/pkg/di/container.go +++ b/api/pkg/di/container.go @@ -3,7 +3,6 @@ package di import ( "context" "crypto/tls" - "database/sql" "fmt" "net/http" "os" @@ -84,7 +83,6 @@ type Container struct { projectID string db *gorm.DB dedicatedDB *gorm.DB - tursoDB *sql.DB mongoClient *mongoDriver.Client version string app *fiber.App @@ -295,23 +293,6 @@ func (container *Container) DedicatedDB() (db *gorm.DB) { return container.dedicatedDB } -// TursoDB creates a *sql.DB connection to a Turso/libSQL database -func (container *Container) TursoDB() *sql.DB { - if container.tursoDB != nil { - return container.tursoDB - } - - container.logger.Debug("creating Turso *sql.DB connection") - - db, err := repositories.NewTursoDB(os.Getenv("TURSO_DATABASE_DSN")) - if err != nil { - container.logger.Fatal(err) - } - - container.tursoDB = db - return container.tursoDB -} - // MongoDB creates a *mongo.Client connection to MongoDB Atlas func (container *Container) MongoDB() *mongoDriver.Client { if container.mongoClient != nil { diff --git a/api/pkg/repositories/libsql.go b/api/pkg/repositories/libsql.go deleted file mode 100644 index 66ee8c0e..00000000 --- a/api/pkg/repositories/libsql.go +++ /dev/null @@ -1,67 +0,0 @@ -package repositories - -import ( - "database/sql" - "fmt" - - _ "github.com/tursodatabase/libsql-client-go/libsql" // libSQL database driver - - "github.com/palantir/stacktrace" -) - -const ( - tableHeartbeats = "heartbeats" - tableHeartbeatMonitors = "heartbeat_monitors" -) - -// NewTursoDB creates a new *sql.DB connection to a Turso database and auto-creates tables -func NewTursoDB(dsn string) (*sql.DB, error) { - db, err := sql.Open("libsql", dsn) - if err != nil { - return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot open turso database with DSN [%s]", dsn)) - } - - if err = db.Ping(); err != nil { - return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot ping turso database with DSN [%s]", dsn)) - } - - if err = createTursoTables(db); err != nil { - return nil, stacktrace.Propagate(err, "cannot create turso tables") - } - - return db, nil -} - -func createTursoTables(db *sql.DB) error { - statements := []string{ - `CREATE TABLE IF NOT EXISTS ` + tableHeartbeats + ` ( - id TEXT PRIMARY KEY, - owner TEXT NOT NULL, - version TEXT NOT NULL, - charging INTEGER NOT NULL DEFAULT 0, - user_id TEXT NOT NULL, - timestamp DATETIME NOT NULL - )`, - `CREATE INDEX IF NOT EXISTS idx_heartbeats_owner_timestamp ON ` + tableHeartbeats + `(owner, timestamp)`, - `CREATE INDEX IF NOT EXISTS idx_heartbeats_user_id ON ` + tableHeartbeats + `(user_id)`, - `CREATE TABLE IF NOT EXISTS ` + tableHeartbeatMonitors + ` ( - id TEXT PRIMARY KEY, - phone_id TEXT NOT NULL, - user_id TEXT NOT NULL, - queue_id TEXT NOT NULL DEFAULT '', - owner TEXT NOT NULL, - phone_online INTEGER NOT NULL DEFAULT 1, - created_at DATETIME NOT NULL, - updated_at DATETIME NOT NULL - )`, - `CREATE INDEX IF NOT EXISTS idx_heartbeat_monitors_user_owner ON ` + tableHeartbeatMonitors + `(user_id, owner)`, - } - - for _, stmt := range statements { - if _, err := db.Exec(stmt); err != nil { - return stacktrace.Propagate(err, fmt.Sprintf("cannot execute statement: %s", stmt)) - } - } - - return nil -} diff --git a/api/pkg/repositories/libsql_heartbeat_monitor_repository.go b/api/pkg/repositories/libsql_heartbeat_monitor_repository.go deleted file mode 100644 index 0cb594af..00000000 --- a/api/pkg/repositories/libsql_heartbeat_monitor_repository.go +++ /dev/null @@ -1,199 +0,0 @@ -package repositories - -import ( - "context" - "database/sql" - "fmt" - "time" - - "github.com/google/uuid" - - "github.com/NdoleStudio/httpsms/pkg/entities" - "github.com/NdoleStudio/httpsms/pkg/telemetry" - "github.com/palantir/stacktrace" -) - -// libsqlHeartbeatMonitorRepository is responsible for persisting entities.HeartbeatMonitor in Turso/libSQL -type libsqlHeartbeatMonitorRepository struct { - logger telemetry.Logger - tracer telemetry.Tracer - db *sql.DB -} - -// NewLibsqlHeartbeatMonitorRepository creates the libSQL version of the HeartbeatMonitorRepository -func NewLibsqlHeartbeatMonitorRepository( - logger telemetry.Logger, - tracer telemetry.Tracer, - db *sql.DB, -) HeartbeatMonitorRepository { - return &libsqlHeartbeatMonitorRepository{ - logger: logger.WithService(fmt.Sprintf("%T", &libsqlHeartbeatMonitorRepository{})), - tracer: tracer, - db: db, - } -} - -func (repository *libsqlHeartbeatMonitorRepository) Store(ctx context.Context, monitor *entities.HeartbeatMonitor) error { - ctx, span := repository.tracer.Start(ctx) - defer span.End() - - ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) - defer cancel() - - _, err := repository.db.ExecContext(ctx, - "INSERT INTO "+tableHeartbeatMonitors+" (id, phone_id, user_id, queue_id, owner, phone_online, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", - monitor.ID.String(), - monitor.PhoneID.String(), - string(monitor.UserID), - monitor.QueueID, - monitor.Owner, - boolToInt(monitor.PhoneOnline), - monitor.CreatedAt.UTC(), - monitor.UpdatedAt.UTC(), - ) - if err != nil { - msg := fmt.Sprintf("cannot save heartbeat monitor with ID [%s]", monitor.ID) - return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) - } - - return nil -} - -func (repository *libsqlHeartbeatMonitorRepository) Load(ctx context.Context, userID entities.UserID, phoneNumber string) (*entities.HeartbeatMonitor, error) { - ctx, span := repository.tracer.Start(ctx) - defer span.End() - - ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) - defer cancel() - - row := repository.db.QueryRowContext(ctx, - "SELECT id, phone_id, user_id, queue_id, owner, phone_online, created_at, updated_at FROM "+tableHeartbeatMonitors+" WHERE user_id = ? AND owner = ? LIMIT 1", - string(userID), phoneNumber, - ) - - monitor, err := repository.scanHeartbeatMonitorRow(row) - if err == sql.ErrNoRows { - msg := fmt.Sprintf("heartbeat monitor with userID [%s] and owner [%s] does not exist", userID, phoneNumber) - return nil, repository.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, ErrCodeNotFound, msg)) - } - if err != nil { - msg := fmt.Sprintf("cannot load heartbeat monitor with userID [%s] and owner [%s]", userID, phoneNumber) - return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) - } - - return monitor, nil -} - -func (repository *libsqlHeartbeatMonitorRepository) Exists(ctx context.Context, userID entities.UserID, monitorID uuid.UUID) (bool, error) { - ctx, span := repository.tracer.Start(ctx) - defer span.End() - - ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) - defer cancel() - - var count int - err := repository.db.QueryRowContext(ctx, - "SELECT COUNT(*) FROM "+tableHeartbeatMonitors+" WHERE user_id = ? AND id = ?", - string(userID), monitorID.String(), - ).Scan(&count) - if err != nil { - msg := fmt.Sprintf("cannot check if heartbeat monitor exists with userID [%s] and monitor ID [%s]", userID, monitorID) - return false, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) - } - - return count > 0, nil -} - -func (repository *libsqlHeartbeatMonitorRepository) UpdateQueueID(ctx context.Context, monitorID uuid.UUID, queueID string) error { - ctx, span := repository.tracer.Start(ctx) - defer span.End() - - ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) - defer cancel() - - _, err := repository.db.ExecContext(ctx, - "UPDATE "+tableHeartbeatMonitors+" SET queue_id = ?, updated_at = ? WHERE id = ?", - queueID, time.Now().UTC(), monitorID.String(), - ) - if err != nil { - msg := fmt.Sprintf("cannot update heartbeat monitor ID [%s]", monitorID) - return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) - } - - return nil -} - -func (repository *libsqlHeartbeatMonitorRepository) Delete(ctx context.Context, userID entities.UserID, phoneNumber string) error { - ctx, span := repository.tracer.Start(ctx) - defer span.End() - - ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) - defer cancel() - - _, err := repository.db.ExecContext(ctx, - "DELETE FROM "+tableHeartbeatMonitors+" WHERE user_id = ? AND owner = ?", - string(userID), phoneNumber, - ) - if err != nil { - msg := fmt.Sprintf("cannot delete heartbeat monitor with owner [%s] and userID [%s]", phoneNumber, userID) - return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) - } - - return nil -} - -func (repository *libsqlHeartbeatMonitorRepository) UpdatePhoneOnline(ctx context.Context, userID entities.UserID, monitorID uuid.UUID, online bool) error { - ctx, span := repository.tracer.Start(ctx) - defer span.End() - - ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) - defer cancel() - - _, err := repository.db.ExecContext(ctx, - "UPDATE "+tableHeartbeatMonitors+" SET phone_online = ?, updated_at = ? WHERE id = ? AND user_id = ?", - boolToInt(online), time.Now().UTC(), monitorID.String(), string(userID), - ) - if err != nil { - msg := fmt.Sprintf("cannot update heartbeat monitor ID [%s] for user [%s]", monitorID, userID) - return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) - } - - return nil -} - -func (repository *libsqlHeartbeatMonitorRepository) DeleteAllForUser(ctx context.Context, userID entities.UserID) error { - ctx, span := repository.tracer.Start(ctx) - defer span.End() - - ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) - defer cancel() - - _, err := repository.db.ExecContext(ctx, "DELETE FROM "+tableHeartbeatMonitors+" WHERE user_id = ?", string(userID)) - if err != nil { - msg := fmt.Sprintf("cannot delete all [%T] for user with ID [%s]", &entities.HeartbeatMonitor{}, userID) - return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) - } - - return nil -} - -func (repository *libsqlHeartbeatMonitorRepository) scanHeartbeatMonitorRow(row *sql.Row) (*entities.HeartbeatMonitor, error) { - monitor := new(entities.HeartbeatMonitor) - var id, phoneID, userID string - var phoneOnline int - err := row.Scan(&id, &phoneID, &userID, &monitor.QueueID, &monitor.Owner, &phoneOnline, &monitor.CreatedAt, &monitor.UpdatedAt) - if err != nil { - return nil, err - } - monitor.ID, err = uuid.Parse(id) - if err != nil { - return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot parse heartbeat monitor ID [%s]", id)) - } - monitor.PhoneID, err = uuid.Parse(phoneID) - if err != nil { - return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot parse heartbeat monitor phone ID [%s]", phoneID)) - } - monitor.UserID = entities.UserID(userID) - monitor.PhoneOnline = phoneOnline != 0 - return monitor, nil -} diff --git a/api/pkg/repositories/libsql_heartbeat_repository.go b/api/pkg/repositories/libsql_heartbeat_repository.go deleted file mode 100644 index 42fdf911..00000000 --- a/api/pkg/repositories/libsql_heartbeat_repository.go +++ /dev/null @@ -1,186 +0,0 @@ -package repositories - -import ( - "context" - "database/sql" - "fmt" - - "github.com/google/uuid" - - "github.com/NdoleStudio/httpsms/pkg/entities" - "github.com/NdoleStudio/httpsms/pkg/telemetry" - "github.com/palantir/stacktrace" -) - -// libsqlHeartbeatRepository is responsible for persisting entities.Heartbeat in Turso/libSQL -type libsqlHeartbeatRepository struct { - logger telemetry.Logger - tracer telemetry.Tracer - db *sql.DB -} - -// NewLibsqlHeartbeatRepository creates the libSQL version of the HeartbeatRepository -func NewLibsqlHeartbeatRepository( - logger telemetry.Logger, - tracer telemetry.Tracer, - db *sql.DB, -) HeartbeatRepository { - return &libsqlHeartbeatRepository{ - logger: logger.WithService(fmt.Sprintf("%T", &libsqlHeartbeatRepository{})), - tracer: tracer, - db: db, - } -} - -func (repository *libsqlHeartbeatRepository) Store(ctx context.Context, heartbeat *entities.Heartbeat) error { - ctx, span, _ := repository.tracer.StartWithLogger(ctx, repository.logger) - defer span.End() - - ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) - defer cancel() - - _, err := repository.db.ExecContext(ctx, - "INSERT INTO "+tableHeartbeats+" (id, owner, version, charging, user_id, timestamp) VALUES (?, ?, ?, ?, ?, ?)", - heartbeat.ID.String(), - heartbeat.Owner, - heartbeat.Version, - boolToInt(heartbeat.Charging), - string(heartbeat.UserID), - heartbeat.Timestamp.UTC(), - ) - if err != nil { - msg := fmt.Sprintf("cannot save heartbeat with ID [%s]", heartbeat.ID) - return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) - } - - return nil -} - -func (repository *libsqlHeartbeatRepository) Index(ctx context.Context, userID entities.UserID, owner string, params IndexParams) (*[]entities.Heartbeat, error) { - ctx, span := repository.tracer.Start(ctx) - defer span.End() - - ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) - defer cancel() - - var rows *sql.Rows - var err error - - if len(params.Query) > 0 { - queryPattern := "%" + params.Query + "%" - rows, err = repository.db.QueryContext(ctx, - "SELECT id, owner, version, charging, user_id, timestamp FROM "+tableHeartbeats+" WHERE user_id = ? AND owner = ? AND version LIKE ? ORDER BY timestamp DESC LIMIT ? OFFSET ?", - string(userID), owner, queryPattern, params.Limit, params.Skip, - ) - } else { - rows, err = repository.db.QueryContext(ctx, - "SELECT id, owner, version, charging, user_id, timestamp FROM "+tableHeartbeats+" WHERE user_id = ? AND owner = ? ORDER BY timestamp DESC LIMIT ? OFFSET ?", - string(userID), owner, params.Limit, params.Skip, - ) - } - if err != nil { - msg := fmt.Sprintf("cannot fetch heartbeats with owner [%s] and params [%+#v]", owner, params) - return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) - } - defer rows.Close() - - heartbeats := make([]entities.Heartbeat, 0) - for rows.Next() { - heartbeat, scanErr := repository.scanHeartbeat(rows) - if scanErr != nil { - msg := fmt.Sprintf("cannot scan heartbeat row for owner [%s]", owner) - return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(scanErr, msg)) - } - heartbeats = append(heartbeats, *heartbeat) - } - if rowsErr := rows.Err(); rowsErr != nil { - msg := fmt.Sprintf("error iterating heartbeat rows for owner [%s]", owner) - return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(rowsErr, msg)) - } - - return &heartbeats, nil -} - -func (repository *libsqlHeartbeatRepository) Last(ctx context.Context, userID entities.UserID, owner string) (*entities.Heartbeat, error) { - ctx, span := repository.tracer.Start(ctx) - defer span.End() - - ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) - defer cancel() - - row := repository.db.QueryRowContext(ctx, - "SELECT id, owner, version, charging, user_id, timestamp FROM "+tableHeartbeats+" WHERE user_id = ? AND owner = ? ORDER BY timestamp DESC LIMIT 1", - string(userID), owner, - ) - - heartbeat, err := repository.scanHeartbeatRow(row) - if err == sql.ErrNoRows { - msg := fmt.Sprintf("heartbeat with userID [%s] and owner [%s] does not exist", userID, owner) - return nil, repository.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, ErrCodeNotFound, msg)) - } - if err != nil { - msg := fmt.Sprintf("cannot load heartbeat with userID [%s] and owner [%s]", userID, owner) - return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) - } - - return heartbeat, nil -} - -func (repository *libsqlHeartbeatRepository) DeleteAllForUser(ctx context.Context, userID entities.UserID) error { - ctx, span := repository.tracer.Start(ctx) - defer span.End() - - ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) - defer cancel() - - _, err := repository.db.ExecContext(ctx, "DELETE FROM "+tableHeartbeats+" WHERE user_id = ?", string(userID)) - if err != nil { - msg := fmt.Sprintf("cannot delete all [%T] for user with ID [%s]", &entities.Heartbeat{}, userID) - return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) - } - - return nil -} - -func (repository *libsqlHeartbeatRepository) scanHeartbeat(rows *sql.Rows) (*entities.Heartbeat, error) { - heartbeat := new(entities.Heartbeat) - var id string - var charging int - var userID string - err := rows.Scan(&id, &heartbeat.Owner, &heartbeat.Version, &charging, &userID, &heartbeat.Timestamp) - if err != nil { - return nil, err - } - heartbeat.ID, err = uuid.Parse(id) - if err != nil { - return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot parse heartbeat ID [%s]", id)) - } - heartbeat.Charging = charging != 0 - heartbeat.UserID = entities.UserID(userID) - return heartbeat, nil -} - -func (repository *libsqlHeartbeatRepository) scanHeartbeatRow(row *sql.Row) (*entities.Heartbeat, error) { - heartbeat := new(entities.Heartbeat) - var id string - var charging int - var userID string - err := row.Scan(&id, &heartbeat.Owner, &heartbeat.Version, &charging, &userID, &heartbeat.Timestamp) - if err != nil { - return nil, err - } - heartbeat.ID, err = uuid.Parse(id) - if err != nil { - return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot parse heartbeat ID [%s]", id)) - } - heartbeat.Charging = charging != 0 - heartbeat.UserID = entities.UserID(userID) - return heartbeat, nil -} - -func boolToInt(b bool) int { - if b { - return 1 - } - return 0 -} From 24f1fc246893a63c4d1c9b49caad06bd4c5ac1ae Mon Sep 17 00:00:00 2001 From: Acho Arnold Date: Sat, 16 May 2026 13:14:17 +0300 Subject: [PATCH 03/10] refactor: use entity structs directly with bson tags instead of document types - Add bson struct tags to entities.Heartbeat and entities.HeartbeatMonitor - Register custom UUID codec so uuid.UUID is stored as string _id in MongoDB - Remove intermediate heartbeatDocument/heartbeatMonitorDocument structs - MongoDB repositories now marshal/unmarshal entities directly Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- api/pkg/entities/heartbeat.go | 12 ++-- api/pkg/entities/heartbeat_monitor.go | 16 ++--- .../mongo_heartbeat_monitor_repository.go | 56 ++--------------- .../mongo_heartbeat_repository.go | 62 +++---------------- api/pkg/repositories/mongodb.go | 33 +++++++++- 5 files changed, 58 insertions(+), 121 deletions(-) diff --git a/api/pkg/entities/heartbeat.go b/api/pkg/entities/heartbeat.go index 629efd29..abc070e9 100644 --- a/api/pkg/entities/heartbeat.go +++ b/api/pkg/entities/heartbeat.go @@ -8,10 +8,10 @@ import ( // Heartbeat represents is a pulse from an active phone type Heartbeat struct { - ID uuid.UUID `json:"id" gorm:"primaryKey;type:uuid;" example:"32343a19-da5e-4b1b-a767-3298a73703cb"` - Owner string `json:"owner" gorm:"index:idx_heartbeats_owner_timestamp" example:"+18005550199"` - Version string `json:"version" example:"344c10f"` - Charging bool `json:"charging" example:"true"` - UserID UserID `json:"user_id" example:"WB7DRDWrJZRGbYrv2CKGkqbzvqdC"` - Timestamp time.Time `json:"timestamp" gorm:"index:idx_heartbeats_owner_timestamp" example:"2022-06-05T14:26:01.520828+03:00"` + ID uuid.UUID `json:"id" gorm:"primaryKey;type:uuid;" bson:"_id" example:"32343a19-da5e-4b1b-a767-3298a73703cb"` + Owner string `json:"owner" gorm:"index:idx_heartbeats_owner_timestamp" bson:"owner" example:"+18005550199"` + Version string `json:"version" bson:"version" example:"344c10f"` + Charging bool `json:"charging" bson:"charging" example:"true"` + UserID UserID `json:"user_id" bson:"user_id" example:"WB7DRDWrJZRGbYrv2CKGkqbzvqdC"` + Timestamp time.Time `json:"timestamp" gorm:"index:idx_heartbeats_owner_timestamp" bson:"timestamp" example:"2022-06-05T14:26:01.520828+03:00"` } diff --git a/api/pkg/entities/heartbeat_monitor.go b/api/pkg/entities/heartbeat_monitor.go index 6b41a31a..7151f195 100644 --- a/api/pkg/entities/heartbeat_monitor.go +++ b/api/pkg/entities/heartbeat_monitor.go @@ -8,14 +8,14 @@ import ( // HeartbeatMonitor is used to monitor heartbeats of a phone type HeartbeatMonitor struct { - ID uuid.UUID `json:"id" gorm:"primaryKey;type:uuid;" example:"32343a19-da5e-4b1b-a767-3298a73703cb"` - PhoneID uuid.UUID `json:"phone_id" example:"32343a19-da5e-4b1b-a767-3298a73703cb"` - UserID UserID `json:"user_id" example:"WB7DRDWrJZRGbYrv2CKGkqbzvqdC"` - QueueID string `json:"queue_id" example:"0360259236613675274"` - Owner string `json:"owner" example:"+18005550199"` - PhoneOnline bool `json:"phone_online" example:"true" default:"true"` - CreatedAt time.Time `json:"created_at" example:"2022-06-05T14:26:02.302718+03:00"` - UpdatedAt time.Time `json:"updated_at" example:"2022-06-05T14:26:10.303278+03:00"` + ID uuid.UUID `json:"id" gorm:"primaryKey;type:uuid;" bson:"_id" example:"32343a19-da5e-4b1b-a767-3298a73703cb"` + PhoneID uuid.UUID `json:"phone_id" bson:"phone_id" example:"32343a19-da5e-4b1b-a767-3298a73703cb"` + UserID UserID `json:"user_id" bson:"user_id" example:"WB7DRDWrJZRGbYrv2CKGkqbzvqdC"` + QueueID string `json:"queue_id" bson:"queue_id" example:"0360259236613675274"` + Owner string `json:"owner" bson:"owner" example:"+18005550199"` + PhoneOnline bool `json:"phone_online" bson:"phone_online" example:"true" default:"true"` + CreatedAt time.Time `json:"created_at" bson:"created_at" example:"2022-06-05T14:26:02.302718+03:00"` + UpdatedAt time.Time `json:"updated_at" bson:"updated_at" example:"2022-06-05T14:26:10.303278+03:00"` } // RequiresCheck returns true if the heartbeat monitor requires a check diff --git a/api/pkg/repositories/mongo_heartbeat_monitor_repository.go b/api/pkg/repositories/mongo_heartbeat_monitor_repository.go index 1f5f014f..ae9a68cd 100644 --- a/api/pkg/repositories/mongo_heartbeat_monitor_repository.go +++ b/api/pkg/repositories/mongo_heartbeat_monitor_repository.go @@ -34,17 +34,6 @@ func NewMongoHeartbeatMonitorRepository( } } -type heartbeatMonitorDocument struct { - ID string `bson:"_id"` - PhoneID string `bson:"phone_id"` - UserID string `bson:"user_id"` - QueueID string `bson:"queue_id"` - Owner string `bson:"owner"` - PhoneOnline bool `bson:"phone_online"` - CreatedAt time.Time `bson:"created_at"` - UpdatedAt time.Time `bson:"updated_at"` -} - func (repository *mongoHeartbeatMonitorRepository) Store(ctx context.Context, monitor *entities.HeartbeatMonitor) error { ctx, span := repository.tracer.Start(ctx) defer span.End() @@ -52,18 +41,7 @@ func (repository *mongoHeartbeatMonitorRepository) Store(ctx context.Context, mo ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) defer cancel() - doc := heartbeatMonitorDocument{ - ID: monitor.ID.String(), - PhoneID: monitor.PhoneID.String(), - UserID: string(monitor.UserID), - QueueID: monitor.QueueID, - Owner: monitor.Owner, - PhoneOnline: monitor.PhoneOnline, - CreatedAt: monitor.CreatedAt.UTC(), - UpdatedAt: monitor.UpdatedAt.UTC(), - } - - _, err := repository.collection.InsertOne(ctx, doc) + _, err := repository.collection.InsertOne(ctx, monitor) if err != nil { msg := fmt.Sprintf("cannot save heartbeat monitor with ID [%s]", monitor.ID) return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) @@ -84,8 +62,8 @@ func (repository *mongoHeartbeatMonitorRepository) Load(ctx context.Context, use {"owner", phoneNumber}, } - var doc heartbeatMonitorDocument - err := repository.collection.FindOne(ctx, filter).Decode(&doc) + var monitor entities.HeartbeatMonitor + err := repository.collection.FindOne(ctx, filter).Decode(&monitor) if err == mongo.ErrNoDocuments { msg := fmt.Sprintf("heartbeat monitor with userID [%s] and owner [%s] does not exist", userID, phoneNumber) return nil, repository.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, ErrCodeNotFound, msg)) @@ -95,12 +73,7 @@ func (repository *mongoHeartbeatMonitorRepository) Load(ctx context.Context, use return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) } - monitor, err := docToHeartbeatMonitor(doc) - if err != nil { - return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, "cannot convert heartbeat monitor document")) - } - - return monitor, nil + return &monitor, nil } func (repository *mongoHeartbeatMonitorRepository) Exists(ctx context.Context, userID entities.UserID, monitorID uuid.UUID) (bool, error) { @@ -207,24 +180,3 @@ func (repository *mongoHeartbeatMonitorRepository) DeleteAllForUser(ctx context. return nil } - -func docToHeartbeatMonitor(doc heartbeatMonitorDocument) (*entities.HeartbeatMonitor, error) { - id, err := uuid.Parse(doc.ID) - if err != nil { - return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot parse heartbeat monitor ID [%s]", doc.ID)) - } - phoneID, err := uuid.Parse(doc.PhoneID) - if err != nil { - return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot parse heartbeat monitor phone ID [%s]", doc.PhoneID)) - } - return &entities.HeartbeatMonitor{ - ID: id, - PhoneID: phoneID, - UserID: entities.UserID(doc.UserID), - QueueID: doc.QueueID, - Owner: doc.Owner, - PhoneOnline: doc.PhoneOnline, - CreatedAt: doc.CreatedAt, - UpdatedAt: doc.UpdatedAt, - }, nil -} diff --git a/api/pkg/repositories/mongo_heartbeat_repository.go b/api/pkg/repositories/mongo_heartbeat_repository.go index 13802e06..68ea6e5f 100644 --- a/api/pkg/repositories/mongo_heartbeat_repository.go +++ b/api/pkg/repositories/mongo_heartbeat_repository.go @@ -3,9 +3,7 @@ package repositories import ( "context" "fmt" - "time" - "github.com/google/uuid" "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo" "go.mongodb.org/mongo-driver/v2/mongo/options" @@ -35,15 +33,6 @@ func NewMongoHeartbeatRepository( } } -type heartbeatDocument struct { - ID string `bson:"_id"` - Owner string `bson:"owner"` - Version string `bson:"version"` - Charging bool `bson:"charging"` - UserID string `bson:"user_id"` - Timestamp time.Time `bson:"timestamp"` -} - func (repository *mongoHeartbeatRepository) Store(ctx context.Context, heartbeat *entities.Heartbeat) error { ctx, span, _ := repository.tracer.StartWithLogger(ctx, repository.logger) defer span.End() @@ -51,16 +40,7 @@ func (repository *mongoHeartbeatRepository) Store(ctx context.Context, heartbeat ctx, cancel := context.WithTimeout(ctx, dbOperationDuration) defer cancel() - doc := heartbeatDocument{ - ID: heartbeat.ID.String(), - Owner: heartbeat.Owner, - Version: heartbeat.Version, - Charging: heartbeat.Charging, - UserID: string(heartbeat.UserID), - Timestamp: heartbeat.Timestamp.UTC(), - } - - _, err := repository.collection.InsertOne(ctx, doc) + _, err := repository.collection.InsertOne(ctx, heartbeat) if err != nil { msg := fmt.Sprintf("cannot save heartbeat with ID [%s]", heartbeat.ID) return repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) @@ -97,20 +77,14 @@ func (repository *mongoHeartbeatRepository) Index(ctx context.Context, userID en } defer cursor.Close(ctx) - var docs []heartbeatDocument - if err = cursor.All(ctx, &docs); err != nil { + var heartbeats []entities.Heartbeat + if err = cursor.All(ctx, &heartbeats); err != nil { msg := fmt.Sprintf("cannot decode heartbeats for owner [%s]", owner) return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) } - heartbeats := make([]entities.Heartbeat, 0, len(docs)) - for _, doc := range docs { - hb, convertErr := docToHeartbeat(doc) - if convertErr != nil { - msg := fmt.Sprintf("cannot convert heartbeat document for owner [%s]", owner) - return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(convertErr, msg)) - } - heartbeats = append(heartbeats, *hb) + if heartbeats == nil { + heartbeats = make([]entities.Heartbeat, 0) } return &heartbeats, nil @@ -130,8 +104,8 @@ func (repository *mongoHeartbeatRepository) Last(ctx context.Context, userID ent opts := options.FindOne().SetSort(bson.D{{"timestamp", -1}}) - var doc heartbeatDocument - err := repository.collection.FindOne(ctx, filter, opts).Decode(&doc) + var heartbeat entities.Heartbeat + err := repository.collection.FindOne(ctx, filter, opts).Decode(&heartbeat) if err == mongo.ErrNoDocuments { msg := fmt.Sprintf("heartbeat with userID [%s] and owner [%s] does not exist", userID, owner) return nil, repository.tracer.WrapErrorSpan(span, stacktrace.PropagateWithCode(err, ErrCodeNotFound, msg)) @@ -141,12 +115,7 @@ func (repository *mongoHeartbeatRepository) Last(ctx context.Context, userID ent return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, msg)) } - heartbeat, err := docToHeartbeat(doc) - if err != nil { - return nil, repository.tracer.WrapErrorSpan(span, stacktrace.Propagate(err, "cannot convert heartbeat document")) - } - - return heartbeat, nil + return &heartbeat, nil } func (repository *mongoHeartbeatRepository) DeleteAllForUser(ctx context.Context, userID entities.UserID) error { @@ -164,18 +133,3 @@ func (repository *mongoHeartbeatRepository) DeleteAllForUser(ctx context.Context return nil } - -func docToHeartbeat(doc heartbeatDocument) (*entities.Heartbeat, error) { - id, err := uuid.Parse(doc.ID) - if err != nil { - return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot parse heartbeat ID [%s]", doc.ID)) - } - return &entities.Heartbeat{ - ID: id, - Owner: doc.Owner, - Version: doc.Version, - Charging: doc.Charging, - UserID: entities.UserID(doc.UserID), - Timestamp: doc.Timestamp, - }, nil -} diff --git a/api/pkg/repositories/mongodb.go b/api/pkg/repositories/mongodb.go index 80328bed..e991e13b 100644 --- a/api/pkg/repositories/mongodb.go +++ b/api/pkg/repositories/mongodb.go @@ -3,8 +3,10 @@ package repositories import ( "context" "fmt" + "reflect" "time" + "github.com/google/uuid" "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo" "go.mongodb.org/mongo-driver/v2/mongo/options" @@ -18,13 +20,42 @@ const ( collectionHeartbeatMonitors = "heartbeat_monitors" ) +// uuidEncodeValue encodes uuid.UUID as a BSON string +func uuidEncodeValue(_ bson.EncodeContext, vw bson.ValueWriter, val reflect.Value) error { + u := val.Interface().(uuid.UUID) + return vw.WriteString(u.String()) +} + +// uuidDecodeValue decodes a BSON string into uuid.UUID +func uuidDecodeValue(_ bson.DecodeContext, vr bson.ValueReader, val reflect.Value) error { + str, err := vr.ReadString() + if err != nil { + return err + } + parsed, err := uuid.Parse(str) + if err != nil { + return err + } + val.Set(reflect.ValueOf(parsed)) + return nil +} + +// newMongoRegistry creates a BSON registry that encodes uuid.UUID as strings +func newMongoRegistry() *bson.Registry { + rb := bson.NewRegistry() + rb.RegisterTypeEncoder(reflect.TypeOf(uuid.UUID{}), bson.ValueEncoderFunc(uuidEncodeValue)) + rb.RegisterTypeDecoder(reflect.TypeOf(uuid.UUID{}), bson.ValueDecoderFunc(uuidDecodeValue)) + return rb +} + // NewMongoDB creates a new *mongo.Client connection to MongoDB Atlas and ensures indexes func NewMongoDB(uri string) (*mongo.Client, error) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() serverAPI := options.ServerAPI(options.ServerAPIVersion1) - opts := options.Client().ApplyURI(uri).SetServerAPIOptions(serverAPI) + registry := newMongoRegistry() + opts := options.Client().ApplyURI(uri).SetServerAPIOptions(serverAPI).SetRegistry(registry) client, err := mongo.Connect(opts) if err != nil { From 6a92f6ca8a21e8d94aac31265c41e7f5e99f9eb1 Mon Sep 17 00:00:00 2001 From: Acho Arnold Date: Sat, 16 May 2026 13:17:58 +0300 Subject: [PATCH 04/10] refactor: derive MongoDB database name from URI appName parameter - Remove hardcoded mongoDBName constant - Parse appName query parameter from MONGODB_URI as the database name - NewMongoDB now returns (client, dbName, error) - Repository constructors accept dbName parameter - DI container caches and passes the DB name to repositories - Update test .env to include appName=httpsms in URI Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- api/pkg/di/container.go | 18 ++++++-- .../mongo_heartbeat_monitor_repository.go | 3 +- .../mongo_heartbeat_repository.go | 3 +- api/pkg/repositories/mongodb.go | 41 ++++++++++++++----- tests/.env.test | 2 +- 5 files changed, 51 insertions(+), 16 deletions(-) diff --git a/api/pkg/di/container.go b/api/pkg/di/container.go index c8b005d9..4fbde132 100644 --- a/api/pkg/di/container.go +++ b/api/pkg/di/container.go @@ -84,6 +84,7 @@ type Container struct { db *gorm.DB dedicatedDB *gorm.DB mongoClient *mongoDriver.Client + mongoDBName string version string app *fiber.App eventDispatcher *services.EventDispatcher @@ -301,15 +302,24 @@ func (container *Container) MongoDB() *mongoDriver.Client { container.logger.Debug("creating MongoDB *mongo.Client connection") - client, err := repositories.NewMongoDB(os.Getenv("MONGODB_URI")) + client, dbName, err := repositories.NewMongoDB(os.Getenv("MONGODB_URI")) if err != nil { container.logger.Fatal(err) } container.mongoClient = client + container.mongoDBName = dbName return container.mongoClient } +// MongoDBName returns the MongoDB database name derived from the connection URI appName +func (container *Container) MongoDBName() string { + if container.mongoClient == nil { + container.MongoDB() + } + return container.mongoDBName +} + // HedgingFailureCounter creates an OTel counter for hedging secondary write failures func (container *Container) HedgingFailureCounter() otelMetric.Int64Counter { meter := otel.GetMeterProvider().Meter( @@ -928,6 +938,7 @@ func (container *Container) HeartbeatMonitorRepository() (repository repositorie container.Logger(), container.Tracer(), container.MongoDB(), + container.MongoDBName(), ) case "hedging": container.logger.Debug("creating hedging repositories.HeartbeatMonitorRepository") @@ -935,7 +946,7 @@ func (container *Container) HeartbeatMonitorRepository() (repository repositorie container.Logger(), container.Tracer(), repositories.NewGormHeartbeatMonitorRepository(container.Logger(), container.Tracer(), container.DedicatedDB()), - repositories.NewMongoHeartbeatMonitorRepository(container.Logger(), container.Tracer(), container.MongoDB()), + repositories.NewMongoHeartbeatMonitorRepository(container.Logger(), container.Tracer(), container.MongoDB(), container.MongoDBName()), container.HedgingFailureCounter(), ) default: @@ -1766,6 +1777,7 @@ func (container *Container) HeartbeatRepository() repositories.HeartbeatReposito container.Logger(), container.Tracer(), container.MongoDB(), + container.MongoDBName(), ) case "hedging": container.logger.Debug("creating hedging repositories.HeartbeatRepository") @@ -1773,7 +1785,7 @@ func (container *Container) HeartbeatRepository() repositories.HeartbeatReposito container.Logger(), container.Tracer(), repositories.NewGormHeartbeatRepository(container.Logger(), container.Tracer(), container.DedicatedDB()), - repositories.NewMongoHeartbeatRepository(container.Logger(), container.Tracer(), container.MongoDB()), + repositories.NewMongoHeartbeatRepository(container.Logger(), container.Tracer(), container.MongoDB(), container.MongoDBName()), container.HedgingFailureCounter(), ) default: diff --git a/api/pkg/repositories/mongo_heartbeat_monitor_repository.go b/api/pkg/repositories/mongo_heartbeat_monitor_repository.go index ae9a68cd..5d9541dd 100644 --- a/api/pkg/repositories/mongo_heartbeat_monitor_repository.go +++ b/api/pkg/repositories/mongo_heartbeat_monitor_repository.go @@ -26,11 +26,12 @@ func NewMongoHeartbeatMonitorRepository( logger telemetry.Logger, tracer telemetry.Tracer, client *mongo.Client, + dbName string, ) HeartbeatMonitorRepository { return &mongoHeartbeatMonitorRepository{ logger: logger.WithService(fmt.Sprintf("%T", &mongoHeartbeatMonitorRepository{})), tracer: tracer, - collection: client.Database(mongoDBName).Collection(collectionHeartbeatMonitors), + collection: client.Database(dbName).Collection(collectionHeartbeatMonitors), } } diff --git a/api/pkg/repositories/mongo_heartbeat_repository.go b/api/pkg/repositories/mongo_heartbeat_repository.go index 68ea6e5f..ed2f8d8a 100644 --- a/api/pkg/repositories/mongo_heartbeat_repository.go +++ b/api/pkg/repositories/mongo_heartbeat_repository.go @@ -25,11 +25,12 @@ func NewMongoHeartbeatRepository( logger telemetry.Logger, tracer telemetry.Tracer, client *mongo.Client, + dbName string, ) HeartbeatRepository { return &mongoHeartbeatRepository{ logger: logger.WithService(fmt.Sprintf("%T", &mongoHeartbeatRepository{})), tracer: tracer, - collection: client.Database(mongoDBName).Collection(collectionHeartbeats), + collection: client.Database(dbName).Collection(collectionHeartbeats), } } diff --git a/api/pkg/repositories/mongodb.go b/api/pkg/repositories/mongodb.go index e991e13b..aeb24a0f 100644 --- a/api/pkg/repositories/mongodb.go +++ b/api/pkg/repositories/mongodb.go @@ -3,6 +3,7 @@ package repositories import ( "context" "fmt" + "net/url" "reflect" "time" @@ -15,7 +16,6 @@ import ( ) const ( - mongoDBName = "httpsms" collectionHeartbeats = "heartbeats" collectionHeartbeatMonitors = "heartbeat_monitors" ) @@ -48,8 +48,14 @@ func newMongoRegistry() *bson.Registry { return rb } -// NewMongoDB creates a new *mongo.Client connection to MongoDB Atlas and ensures indexes -func NewMongoDB(uri string) (*mongo.Client, error) { +// NewMongoDB creates a new *mongo.Client connection to MongoDB Atlas and ensures indexes. +// The database name is derived from the appName query parameter in the URI. +func NewMongoDB(uri string) (*mongo.Client, string, error) { + dbName, err := parseMongoDBName(uri) + if err != nil { + return nil, "", stacktrace.Propagate(err, "cannot parse database name from MongoDB URI") + } + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -59,22 +65,37 @@ func NewMongoDB(uri string) (*mongo.Client, error) { client, err := mongo.Connect(opts) if err != nil { - return nil, stacktrace.Propagate(err, "cannot connect to MongoDB Atlas") + return nil, "", stacktrace.Propagate(err, "cannot connect to MongoDB Atlas") } if err = client.Ping(ctx, nil); err != nil { - return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot ping MongoDB with URI [%s]", uri)) + return nil, "", stacktrace.Propagate(err, fmt.Sprintf("cannot ping MongoDB with URI [%s]", uri)) + } + + if err = createMongoIndexes(ctx, client, dbName); err != nil { + return nil, "", stacktrace.Propagate(err, "cannot create MongoDB indexes") + } + + return client, dbName, nil +} + +// parseMongoDBName extracts the appName query parameter from the MongoDB URI to use as the database name +func parseMongoDBName(uri string) (string, error) { + parsed, err := url.Parse(uri) + if err != nil { + return "", stacktrace.Propagate(err, fmt.Sprintf("cannot parse MongoDB URI [%s]", uri)) } - if err = createMongoIndexes(ctx, client); err != nil { - return nil, stacktrace.Propagate(err, "cannot create MongoDB indexes") + appName := parsed.Query().Get("appName") + if appName == "" { + return "", stacktrace.NewError("MongoDB URI is missing the 'appName' query parameter which is used as the database name") } - return client, nil + return appName, nil } -func createMongoIndexes(ctx context.Context, client *mongo.Client) error { - db := client.Database(mongoDBName) +func createMongoIndexes(ctx context.Context, client *mongo.Client, dbName string) error { + db := client.Database(dbName) // Heartbeats indexes heartbeatsCol := db.Collection(collectionHeartbeats) diff --git a/tests/.env.test b/tests/.env.test index 3cec6a66..ac18af92 100644 --- a/tests/.env.test +++ b/tests/.env.test @@ -29,4 +29,4 @@ GCS_BUCKET_NAME= UPTRACE_DSN= CLOUDFLARE_TURNSTILE_SECRET_KEY= HEARTBEAT_DB_BACKEND=hedging -MONGODB_URI=mongodb://httpsms:testpassword@mongodb:27017/?authSource=admin +MONGODB_URI=mongodb://httpsms:testpassword@mongodb:27017/?authSource=admin&appName=httpsms From fad8b324f1b7c0d835e49646b664f66214716b34 Mon Sep 17 00:00:00 2001 From: Acho Arnold Date: Sat, 16 May 2026 13:20:14 +0300 Subject: [PATCH 05/10] refactor: pass *mongo.Database directly to repository constructors - NewMongoDB now returns *mongo.Database instead of (*mongo.Client, dbName) - Repository constructors accept *mongo.Database instead of client + dbName - DI container caches the *mongo.Database singleton directly - Removes MongoDBName() helper - no longer needed Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- api/pkg/di/container.go | 34 ++++++------------- .../mongo_heartbeat_monitor_repository.go | 5 ++- .../mongo_heartbeat_repository.go | 5 ++- api/pkg/repositories/mongodb.go | 22 ++++++------ 4 files changed, 26 insertions(+), 40 deletions(-) diff --git a/api/pkg/di/container.go b/api/pkg/di/container.go index 4fbde132..e9ca3718 100644 --- a/api/pkg/di/container.go +++ b/api/pkg/di/container.go @@ -83,8 +83,7 @@ type Container struct { projectID string db *gorm.DB dedicatedDB *gorm.DB - mongoClient *mongoDriver.Client - mongoDBName string + mongoDB *mongoDriver.Database version string app *fiber.App eventDispatcher *services.EventDispatcher @@ -294,30 +293,21 @@ func (container *Container) DedicatedDB() (db *gorm.DB) { return container.dedicatedDB } -// MongoDB creates a *mongo.Client connection to MongoDB Atlas -func (container *Container) MongoDB() *mongoDriver.Client { - if container.mongoClient != nil { - return container.mongoClient +// MongoDB creates a *mongo.Database connection to MongoDB Atlas +func (container *Container) MongoDB() *mongoDriver.Database { + if container.mongoDB != nil { + return container.mongoDB } - container.logger.Debug("creating MongoDB *mongo.Client connection") + container.logger.Debug("creating MongoDB *mongo.Database connection") - client, dbName, err := repositories.NewMongoDB(os.Getenv("MONGODB_URI")) + db, err := repositories.NewMongoDB(os.Getenv("MONGODB_URI")) if err != nil { container.logger.Fatal(err) } - container.mongoClient = client - container.mongoDBName = dbName - return container.mongoClient -} - -// MongoDBName returns the MongoDB database name derived from the connection URI appName -func (container *Container) MongoDBName() string { - if container.mongoClient == nil { - container.MongoDB() - } - return container.mongoDBName + container.mongoDB = db + return container.mongoDB } // HedgingFailureCounter creates an OTel counter for hedging secondary write failures @@ -938,7 +928,6 @@ func (container *Container) HeartbeatMonitorRepository() (repository repositorie container.Logger(), container.Tracer(), container.MongoDB(), - container.MongoDBName(), ) case "hedging": container.logger.Debug("creating hedging repositories.HeartbeatMonitorRepository") @@ -946,7 +935,7 @@ func (container *Container) HeartbeatMonitorRepository() (repository repositorie container.Logger(), container.Tracer(), repositories.NewGormHeartbeatMonitorRepository(container.Logger(), container.Tracer(), container.DedicatedDB()), - repositories.NewMongoHeartbeatMonitorRepository(container.Logger(), container.Tracer(), container.MongoDB(), container.MongoDBName()), + repositories.NewMongoHeartbeatMonitorRepository(container.Logger(), container.Tracer(), container.MongoDB()), container.HedgingFailureCounter(), ) default: @@ -1777,7 +1766,6 @@ func (container *Container) HeartbeatRepository() repositories.HeartbeatReposito container.Logger(), container.Tracer(), container.MongoDB(), - container.MongoDBName(), ) case "hedging": container.logger.Debug("creating hedging repositories.HeartbeatRepository") @@ -1785,7 +1773,7 @@ func (container *Container) HeartbeatRepository() repositories.HeartbeatReposito container.Logger(), container.Tracer(), repositories.NewGormHeartbeatRepository(container.Logger(), container.Tracer(), container.DedicatedDB()), - repositories.NewMongoHeartbeatRepository(container.Logger(), container.Tracer(), container.MongoDB(), container.MongoDBName()), + repositories.NewMongoHeartbeatRepository(container.Logger(), container.Tracer(), container.MongoDB()), container.HedgingFailureCounter(), ) default: diff --git a/api/pkg/repositories/mongo_heartbeat_monitor_repository.go b/api/pkg/repositories/mongo_heartbeat_monitor_repository.go index 5d9541dd..13200c07 100644 --- a/api/pkg/repositories/mongo_heartbeat_monitor_repository.go +++ b/api/pkg/repositories/mongo_heartbeat_monitor_repository.go @@ -25,13 +25,12 @@ type mongoHeartbeatMonitorRepository struct { func NewMongoHeartbeatMonitorRepository( logger telemetry.Logger, tracer telemetry.Tracer, - client *mongo.Client, - dbName string, + db *mongo.Database, ) HeartbeatMonitorRepository { return &mongoHeartbeatMonitorRepository{ logger: logger.WithService(fmt.Sprintf("%T", &mongoHeartbeatMonitorRepository{})), tracer: tracer, - collection: client.Database(dbName).Collection(collectionHeartbeatMonitors), + collection: db.Collection(collectionHeartbeatMonitors), } } diff --git a/api/pkg/repositories/mongo_heartbeat_repository.go b/api/pkg/repositories/mongo_heartbeat_repository.go index ed2f8d8a..d8a7839c 100644 --- a/api/pkg/repositories/mongo_heartbeat_repository.go +++ b/api/pkg/repositories/mongo_heartbeat_repository.go @@ -24,13 +24,12 @@ type mongoHeartbeatRepository struct { func NewMongoHeartbeatRepository( logger telemetry.Logger, tracer telemetry.Tracer, - client *mongo.Client, - dbName string, + db *mongo.Database, ) HeartbeatRepository { return &mongoHeartbeatRepository{ logger: logger.WithService(fmt.Sprintf("%T", &mongoHeartbeatRepository{})), tracer: tracer, - collection: client.Database(dbName).Collection(collectionHeartbeats), + collection: db.Collection(collectionHeartbeats), } } diff --git a/api/pkg/repositories/mongodb.go b/api/pkg/repositories/mongodb.go index aeb24a0f..bbad0c07 100644 --- a/api/pkg/repositories/mongodb.go +++ b/api/pkg/repositories/mongodb.go @@ -48,12 +48,12 @@ func newMongoRegistry() *bson.Registry { return rb } -// NewMongoDB creates a new *mongo.Client connection to MongoDB Atlas and ensures indexes. +// NewMongoDB creates a new *mongo.Database connection to MongoDB Atlas and ensures indexes. // The database name is derived from the appName query parameter in the URI. -func NewMongoDB(uri string) (*mongo.Client, string, error) { +func NewMongoDB(uri string) (*mongo.Database, error) { dbName, err := parseMongoDBName(uri) if err != nil { - return nil, "", stacktrace.Propagate(err, "cannot parse database name from MongoDB URI") + return nil, stacktrace.Propagate(err, "cannot parse database name from MongoDB URI") } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) @@ -65,18 +65,20 @@ func NewMongoDB(uri string) (*mongo.Client, string, error) { client, err := mongo.Connect(opts) if err != nil { - return nil, "", stacktrace.Propagate(err, "cannot connect to MongoDB Atlas") + return nil, stacktrace.Propagate(err, "cannot connect to MongoDB Atlas") } if err = client.Ping(ctx, nil); err != nil { - return nil, "", stacktrace.Propagate(err, fmt.Sprintf("cannot ping MongoDB with URI [%s]", uri)) + return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot ping MongoDB with URI [%s]", uri)) } - if err = createMongoIndexes(ctx, client, dbName); err != nil { - return nil, "", stacktrace.Propagate(err, "cannot create MongoDB indexes") + db := client.Database(dbName) + + if err = createMongoIndexes(ctx, db); err != nil { + return nil, stacktrace.Propagate(err, "cannot create MongoDB indexes") } - return client, dbName, nil + return db, nil } // parseMongoDBName extracts the appName query parameter from the MongoDB URI to use as the database name @@ -94,9 +96,7 @@ func parseMongoDBName(uri string) (string, error) { return appName, nil } -func createMongoIndexes(ctx context.Context, client *mongo.Client, dbName string) error { - db := client.Database(dbName) - +func createMongoIndexes(ctx context.Context, db *mongo.Database) error { // Heartbeats indexes heartbeatsCol := db.Collection(collectionHeartbeats) _, err := heartbeatsCol.Indexes().CreateMany(ctx, []mongo.IndexModel{ From 563453e07df41f21715f400cbdaa5094ad8d39c9 Mon Sep 17 00:00:00 2001 From: Acho Arnold Date: Sat, 16 May 2026 13:20:43 +0300 Subject: [PATCH 06/10] fix: correct outgoing message queue docs URL in settings Fix typo in URL: outgiong -> outgoing Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- web/pages/settings/index.vue | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/web/pages/settings/index.vue b/web/pages/settings/index.vue index 877565ec..2b4348ca 100644 --- a/web/pages/settings/index.vue +++ b/web/pages/settings/index.vue @@ -443,7 +443,7 @@ Documentation From 1d291f313c19b31b5177ffe85328d5e039d4ecb3 Mon Sep 17 00:00:00 2001 From: Acho Arnold Date: Sat, 16 May 2026 13:26:24 +0300 Subject: [PATCH 07/10] feat: add OpenTelemetry tracing to MongoDB client Use otelmongo.NewMonitor() as the command monitor on the MongoDB client to automatically trace all database operations. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- api/go.mod | 7 ++++--- api/go.sum | 14 ++++++++------ api/pkg/repositories/mongodb.go | 10 +++++++--- 3 files changed, 19 insertions(+), 12 deletions(-) diff --git a/api/go.mod b/api/go.mod index de1338cd..1fe7dca1 100644 --- a/api/go.mod +++ b/api/go.mod @@ -47,6 +47,7 @@ require ( github.com/uptrace/uptrace-go v1.43.0 github.com/xuri/excelize/v2 v2.10.1 go.mongodb.org/mongo-driver/v2 v2.6.0 + go.opentelemetry.io/contrib/instrumentation/go.mongodb.org/mongo-driver/v2/mongo/otelmongo v0.0.0-20260513205827-ba143fc95a5e go.opentelemetry.io/otel v1.43.0 go.opentelemetry.io/otel/metric v1.43.0 go.opentelemetry.io/otel/sdk v1.43.0 @@ -187,12 +188,12 @@ require ( go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.28.0 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect - golang.org/x/crypto v0.50.0 // indirect + golang.org/x/crypto v0.51.0 // indirect golang.org/x/mod v0.35.0 // indirect golang.org/x/net v0.53.0 // indirect golang.org/x/oauth2 v0.36.0 // indirect - golang.org/x/sys v0.43.0 // indirect - golang.org/x/text v0.36.0 // indirect + golang.org/x/sys v0.44.0 // indirect + golang.org/x/text v0.37.0 // indirect golang.org/x/time v0.15.0 // indirect golang.org/x/tools v0.44.0 // indirect google.golang.org/appengine v1.6.8 // indirect diff --git a/api/go.sum b/api/go.sum index be12cc63..fa1163a2 100644 --- a/api/go.sum +++ b/api/go.sum @@ -359,6 +359,8 @@ go.opentelemetry.io/contrib v1.43.0 h1:rv+pngknCr4qpZDxSpEvEoRioutgfbkk82x6MChJQ go.opentelemetry.io/contrib v1.43.0/go.mod h1:JYdNU7Pl/2ckKMGp8/G7zeyhEbtRmy9Q8bcrtv75Znk= go.opentelemetry.io/contrib/detectors/gcp v1.43.0 h1:62yY3dT7/ShwOxzA0RsKRgshBmfElKI4d/Myu2OxDFU= go.opentelemetry.io/contrib/detectors/gcp v1.43.0/go.mod h1:RyaZMFY7yi1kAs45S6mbFGz8O8rqB0dTY14uzvG4LCs= +go.opentelemetry.io/contrib/instrumentation/go.mongodb.org/mongo-driver/v2/mongo/otelmongo v0.0.0-20260513205827-ba143fc95a5e h1:OX282aWfZNOrSVUPF59HlRhyA+MDcyi4kI8WWXt6A8I= +go.opentelemetry.io/contrib/instrumentation/go.mongodb.org/mongo-driver/v2/mongo/otelmongo v0.0.0-20260513205827-ba143fc95a5e/go.mod h1:lw7VQzmNsmkZBRQqOQiREGxO3GtzG/pOVEmKufablmA= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.68.0 h1:0Qx7VGBacMm9ZENQ7TnNObTYI4ShC+lHI16seduaxZo= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.68.0/go.mod h1:Sje3i3MjSPKTSPvVWCaL8ugBzJwik3u4smCjUeuupqg= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.68.0 h1:CqXxU8VOmDefoh0+ztfGaymYbhdB/tT3zs79QaZTNGY= @@ -418,8 +420,8 @@ golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliY golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= -golang.org/x/crypto v0.50.0 h1:zO47/JPrL6vsNkINmLoo/PH1gcxpls50DNogFvB5ZGI= -golang.org/x/crypto v0.50.0/go.mod h1:3muZ7vA7PBCE6xgPX7nkzzjiUq87kRItoJQM1Yo8S+Q= +golang.org/x/crypto v0.51.0 h1:IBPXwPfKxY7cWQZ38ZCIRPI50YLeevDLlLnyC5wRGTI= +golang.org/x/crypto v0.51.0/go.mod h1:8AdwkbraGNABw2kOX6YFPs3WM22XqI4EXEd8g+x7Oc8= golang.org/x/image v0.25.0 h1:Y6uW6rH1y5y/LK1J8BPWZtr6yZ7hrsy6hFrXjgsc2fQ= golang.org/x/image v0.25.0/go.mod h1:tCAmOEGthTtkalusGp1g3xa2gke8J6c2N565dTyl9Rs= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= @@ -464,8 +466,8 @@ golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI= -golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= +golang.org/x/sys v0.44.0 h1:ildZl3J4uzeKP07r2F++Op7E9B29JRUy+a27EibtBTQ= +golang.org/x/sys v0.44.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2/go.mod h1:TeRTkGYfJXctD9OcfyVLyj2J3IxLnKwHJR8f4D8a3YE= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -485,8 +487,8 @@ golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= -golang.org/x/text v0.36.0 h1:JfKh3XmcRPqZPKevfXVpI1wXPTqbkE5f7JA92a55Yxg= -golang.org/x/text v0.36.0/go.mod h1:NIdBknypM8iqVmPiuco0Dh6P5Jcdk8lJL0CUebqK164= +golang.org/x/text v0.37.0 h1:Cqjiwd9eSg8e0QAkyCaQTNHFIIzWtidPahFWR83rTrc= +golang.org/x/text v0.37.0/go.mod h1:a5sjxXGs9hsn/AJVwuElvCAo9v8QYLzvavO5z2PiM38= golang.org/x/time v0.15.0 h1:bbrp8t3bGUeFOx08pvsMYRTCVSMk89u4tKbNOZbp88U= golang.org/x/time v0.15.0/go.mod h1:Y4YMaQmXwGQZoFaVFk4YpCt4FLQMYKZe9oeV/f4MSno= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/api/pkg/repositories/mongodb.go b/api/pkg/repositories/mongodb.go index bbad0c07..bb884148 100644 --- a/api/pkg/repositories/mongodb.go +++ b/api/pkg/repositories/mongodb.go @@ -8,11 +8,11 @@ import ( "time" "github.com/google/uuid" + "github.com/palantir/stacktrace" "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo" "go.mongodb.org/mongo-driver/v2/mongo/options" - - "github.com/palantir/stacktrace" + "go.opentelemetry.io/contrib/instrumentation/go.mongodb.org/mongo-driver/v2/mongo/otelmongo" ) const ( @@ -61,7 +61,11 @@ func NewMongoDB(uri string) (*mongo.Database, error) { serverAPI := options.ServerAPI(options.ServerAPIVersion1) registry := newMongoRegistry() - opts := options.Client().ApplyURI(uri).SetServerAPIOptions(serverAPI).SetRegistry(registry) + opts := options.Client(). + ApplyURI(uri). + SetServerAPIOptions(serverAPI). + SetRegistry(registry). + SetMonitor(otelmongo.NewMonitor()) client, err := mongo.Connect(opts) if err != nil { From cdac79a69f201ec9b5e8f766dc00522d322f5fa6 Mon Sep 17 00:00:00 2001 From: Acho Arnold Date: Sat, 16 May 2026 13:28:30 +0300 Subject: [PATCH 08/10] fix: address PR review comments - Remove MongoDB URI from Ping error message to prevent credential exposure in logs and error aggregators - Use separate contexts for Ping (10s) and index creation (30s) so they don't share a timeout budget Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- api/pkg/repositories/mongodb.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/api/pkg/repositories/mongodb.go b/api/pkg/repositories/mongodb.go index bb884148..f2f65ea9 100644 --- a/api/pkg/repositories/mongodb.go +++ b/api/pkg/repositories/mongodb.go @@ -56,8 +56,8 @@ func NewMongoDB(uri string) (*mongo.Database, error) { return nil, stacktrace.Propagate(err, "cannot parse database name from MongoDB URI") } - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() + pingCtx, pingCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer pingCancel() serverAPI := options.ServerAPI(options.ServerAPIVersion1) registry := newMongoRegistry() @@ -72,13 +72,16 @@ func NewMongoDB(uri string) (*mongo.Database, error) { return nil, stacktrace.Propagate(err, "cannot connect to MongoDB Atlas") } - if err = client.Ping(ctx, nil); err != nil { - return nil, stacktrace.Propagate(err, fmt.Sprintf("cannot ping MongoDB with URI [%s]", uri)) + if err = client.Ping(pingCtx, nil); err != nil { + return nil, stacktrace.Propagate(err, "cannot ping MongoDB Atlas") } db := client.Database(dbName) - if err = createMongoIndexes(ctx, db); err != nil { + indexCtx, indexCancel := context.WithTimeout(context.Background(), 30*time.Second) + defer indexCancel() + + if err = createMongoIndexes(indexCtx, db); err != nil { return nil, stacktrace.Propagate(err, "cannot create MongoDB indexes") } From e0f4d241a88c8d1fe97bfb30f41cee672c518049 Mon Sep 17 00:00:00 2001 From: Acho Arnold Date: Sat, 16 May 2026 13:31:31 +0300 Subject: [PATCH 09/10] fix: update send schedule docs link to correct URL Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- web/pages/settings/index.vue | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/web/pages/settings/index.vue b/web/pages/settings/index.vue index 2b4348ca..96aa83f3 100644 --- a/web/pages/settings/index.vue +++ b/web/pages/settings/index.vue @@ -385,7 +385,7 @@ delivered when the schedule opens according to your configured send rate.

From 6e45a28d161b37d3629ec916a1e0b93f17eac997 Mon Sep 17 00:00:00 2001 From: Acho Arnold Date: Sat, 16 May 2026 13:37:57 +0300 Subject: [PATCH 10/10] fix: update CI workflow to check MongoDB instead of sqld Replace the sqld health check with a MongoDB mongosh ping check since the test infrastructure now uses mongo:7 instead of sqld. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .github/workflows/api.yml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/.github/workflows/api.yml b/.github/workflows/api.yml index e8044d5c..849f5ec8 100644 --- a/.github/workflows/api.yml +++ b/.github/workflows/api.yml @@ -37,18 +37,18 @@ jobs: - name: Wait for services to be healthy working-directory: ./tests run: | - echo "Waiting for sqld to be healthy..." + echo "Waiting for MongoDB to be healthy..." for i in $(seq 1 20); do - if curl -sf http://localhost:8090/health >/dev/null 2>&1; then - echo "sqld is healthy!" + if docker compose exec mongodb mongosh --eval "db.runCommand('ping').ok" --quiet >/dev/null 2>&1; then + echo "MongoDB is healthy!" break fi if [ $i -eq 20 ]; then - echo "sqld failed to become healthy" - docker compose logs sqld + echo "MongoDB failed to become healthy" + docker compose logs mongodb exit 1 fi - echo "sqld attempt $i/20 - waiting 3s..." + echo "MongoDB attempt $i/20 - waiting 3s..." sleep 3 done