Add LoadOrStore and LoadAndDelete (atomic insert-if-absent and read-and-consume)#26
Add LoadOrStore and LoadAndDelete (atomic insert-if-absent and read-and-consume)#26mlwelles wants to merge 21 commits into
Conversation
Add a generic typed layer over modusgraph.Client: typed.Client[T] with CRUD and iterators; a fluent Query[T] builder (filters, ordering, paging, edge traversal, IterNodes); MultiQuery for N homogeneous blocks in one round-trip; functional options; a filter DSL (typed/filter); and ordered result merging (typed/search). A small no-op-by-default Tracer seam (typed.SetTracer) lets a host plug in tracing without the typed package depending on any telemetry library. Self-contained: builds and tests against the current client with no other changes.
Add RetryPolicy / DefaultRetryPolicy and a runner that re-executes a function on aborted Dgraph transactions with exponential backoff (retry.go), exposed on the client via a WithRetry method.
Add the Schema interface (SchemaTypeName), the UnwrapSchema reflection helper, and the DgraphMapper interface (record.go). The client unwraps schema-defining values at the mutation and query boundary so generated wrapper types route to their backing schema struct. Plain structs do not implement Schema and are unaffected — UnwrapSchema is identity for them.
The unit-test job runs `go test -short`, which skips every test that needs a live Dgraph. Standing up a dgraph/standalone container (and setting MODUSGRAPH_TEST_ADDR) therefore adds setup the job never uses. Remove both; the integration and load suites keep their own dedicated jobs.
Add common local artifacts to .gitignore: editor config (.idea/, .vscode/), the built ./query binary, load_test benchmark JSON, and git worktrees.
Adds WithGRPCDialOption(opt grpc.DialOption), a general escape hatch for gRPC dial settings the dedicated options do not cover — TLS transport credentials, interceptors, keepalive, and so on — on remote (dgraph://) connections. The existing WithMaxRecvMsgSize is folded into the same dial-option assembly, so the two compose cleanly, and the client dedup key now counts the custom dial options so differently-configured clients are not merged. No change for embedded (file://) URIs.
Adds raw schema-DDL primitives that complement UpdateSchema's object-template inference: - Client.AlterSchema(ctx, schema) applies a raw DQL schema string directly, giving full control over predicate types, indexes, and directives — useful for migrations that declare predicates no Go type models yet. - Engine.dropPredicate deletes a single predicate (and its data) from the embedded engine via posting.DeletePredicate. - embedded_client.go routes an Alter carrying DropAttr to dropPredicate, so the embedded path matches a remote Dgraph cluster's DropAttr behavior. TestDropPredicateEmbedded exercises the full declare/insert/drop cycle against the embedded engine.
Adds SelfValidator, an opt-in seam that lets a type drive its own validation. When a value passed to Insert, Upsert, or Update implements SelfValidator, the client calls ValidateWith instead of handing the value straight to the configured StructValidator. validateStruct now routes each element through a new validateOne helper that detects SelfValidator (on the value or its address) and otherwise falls back to StructCtx exactly as before — behavior is unchanged for ordinary structs. This is the runtime seam generated entities use to validate unexported fields: the generated ValidateWith builds a mirror struct with exported fields the go-playground validator can read by reflection.
Add TestLoadAndDeleteSingleWinner and serialize LoadAndDelete's read-then-delete critical section with a per-client mutex so exactly one in-process caller consumes a node. The embedded engine's commit path does no optimistic-concurrency conflict check, so the shared read-write transaction alone cannot abort losers; the lock guarantees single-winner semantics regardless of backend.
There was a problem hiding this comment.
10 issues found across 31 files
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="self_validator_test.go">
<violation number="1" location="self_validator_test.go:57">
P2: Slice self-validator test does not assert that `StructCtx` was never called, unlike the analogous scalar test `TestValidateRoutesToSelfValidator`. A regression where slice handling incorrectly invokes both paths would still pass.</violation>
</file>
<file name="dial_options_test.go">
<violation number="1" location="dial_options_test.go:23">
P2: TestKeyDistinguishesGRPCDialOptions is too weak: it only verifies empty-vs-non-empty gRPC dial options and would not catch collisions between differently configured clients that have the same option count.</violation>
</file>
<file name="retry.go">
<violation number="1" location="retry.go:77">
P2: Negative `MaxRetries` can skip the loop and hit the supposedly unreachable `panic("unreachable")`. When `MaxRetries` is `-1`, `range policy.MaxRetries + 1` becomes `range 0`, which executes zero iterations. Execution falls through to the `panic` instead of calling `fn()` or returning an error.</violation>
</file>
<file name="client.go">
<violation number="1" location="client.go:497">
P1: Client cache key can collide for different gRPC dial option sets because it keys on option count only</violation>
<violation number="2" location="client.go:648">
P2: Nil pointer input can panic in `firstUpsertPredicate` via reflection dereferencing loop</violation>
<violation number="3" location="client.go:722">
P2: Global client-wide mutex `consumeMu` over-serializes `LoadAndDelete` across all keys and backends. The mutex is only needed for the embedded engine (which lacks commit-time conflict checks), but it is applied unconditionally, including for remote Dgraph clusters where transaction aborts already guarantee single-winner semantics. This creates unnecessary head-of-line blocking and throughput regression under concurrent consumers with different keys. Consider gating the lock to embedded mode, using a per-key/sharded lock, or shortening the critical section.</violation>
</file>
<file name="typed/query.go">
<violation number="1" location="typed/query.go:388">
P2: NodesAndCount terminal is missing tracing instrumentation unlike other terminal methods</violation>
</file>
<file name="record.go">
<violation number="1" location="record.go:35">
P2: Typed nil pointers are not guarded before calling Unwrap via reflection, which can panic if the method dereferences the nil receiver.</violation>
</file>
<file name="consume_test.go">
<violation number="1" location="consume_test.go:44">
P2: TestLoadOrStore does not assert object hydration on the loaded=true path, so a regression where LoadOrStore returns loaded=true without populating the existing record's fields would pass silently.</violation>
</file>
<file name="retry_test.go">
<violation number="1" location="retry_test.go:115">
P2: `TestWithRetryContextCancellation` claims to verify context cancellation during backoff sleep, but the callback returns `ctx.Err()` (not `dgo.ErrAborted`), so `WithRetry` returns immediately and never enters the backoff/retry loop. The `select` block handling `ctx.Done()` during delay is never exercised, providing false confidence.</violation>
</file>
Tip: cubic can generate docs of your entire codebase and keep them up to date. Try it here.
Re-trigger cubic
| return fmt.Sprintf("%s:%t:%d:%d:%d:%d:%s:%s:%s:%d", c.uri, c.options.autoSchema, c.options.poolSize, | ||
| c.options.maxEdgeTraversal, c.options.cacheSizeMB, c.options.maxRecvMsgSize, | ||
| c.options.namespace, validatorKey, embeddingKey) | ||
| c.options.namespace, validatorKey, embeddingKey, len(c.options.grpcDialOptions)) |
There was a problem hiding this comment.
P1: Client cache key can collide for different gRPC dial option sets because it keys on option count only
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At client.go, line 497:
<comment>Client cache key can collide for different gRPC dial option sets because it keys on option count only</comment>
<file context>
@@ -430,9 +492,9 @@ func (c client) key() string {
+ return fmt.Sprintf("%s:%t:%d:%d:%d:%d:%s:%s:%s:%d", c.uri, c.options.autoSchema, c.options.poolSize,
c.options.maxEdgeTraversal, c.options.cacheSizeMB, c.options.maxRecvMsgSize,
- c.options.namespace, validatorKey, embeddingKey)
+ c.options.namespace, validatorKey, embeddingKey, len(c.options.grpcDialOptions))
}
</file context>
| @@ -0,0 +1,65 @@ | |||
| /* | |||
There was a problem hiding this comment.
P2: Slice self-validator test does not assert that StructCtx was never called, unlike the analogous scalar test TestValidateRoutesToSelfValidator. A regression where slice handling incorrectly invokes both paths would still pass.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At self_validator_test.go, line 57:
<comment>Slice self-validator test does not assert that `StructCtx` was never called, unlike the analogous scalar test `TestValidateRoutesToSelfValidator`. A regression where slice handling incorrectly invokes both paths would still pass.</comment>
<file context>
@@ -0,0 +1,65 @@
+ }
+}
+
+func TestValidateSelfValidatorInSlice(t *testing.T) {
+ rv := &recordingValidator{}
+ c := client{options: clientOptions{validator: rv}}
</file context>
| } | ||
| } | ||
|
|
||
| func TestKeyDistinguishesGRPCDialOptions(t *testing.T) { |
There was a problem hiding this comment.
P2: TestKeyDistinguishesGRPCDialOptions is too weak: it only verifies empty-vs-non-empty gRPC dial options and would not catch collisions between differently configured clients that have the same option count.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At dial_options_test.go, line 23:
<comment>TestKeyDistinguishesGRPCDialOptions is too weak: it only verifies empty-vs-non-empty gRPC dial options and would not catch collisions between differently configured clients that have the same option count.</comment>
<file context>
@@ -0,0 +1,30 @@
+ }
+}
+
+func TestKeyDistinguishesGRPCDialOptions(t *testing.T) {
+ base := client{uri: "dgraph://localhost:9080"}
+ withOpt := client{uri: "dgraph://localhost:9080"}
</file context>
| // return client.Insert(ctx, &entity) | ||
| // }) | ||
| func (c client) WithRetry(ctx context.Context, policy RetryPolicy, fn func() error) error { | ||
| for attempt := range policy.MaxRetries + 1 { |
There was a problem hiding this comment.
P2: Negative MaxRetries can skip the loop and hit the supposedly unreachable panic("unreachable"). When MaxRetries is -1, range policy.MaxRetries + 1 becomes range 0, which executes zero iterations. Execution falls through to the panic instead of calling fn() or returning an error.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At retry.go, line 77:
<comment>Negative `MaxRetries` can skip the loop and hit the supposedly unreachable `panic("unreachable")`. When `MaxRetries` is `-1`, `range policy.MaxRetries + 1` becomes `range 0`, which executes zero iterations. Execution falls through to the `panic` instead of calling `fn()` or returning an error.</comment>
<file context>
@@ -0,0 +1,96 @@
+// return client.Insert(ctx, &entity)
+// })
+func (c client) WithRetry(ctx context.Context, policy RetryPolicy, fn func() error) error {
+ for attempt := range policy.MaxRetries + 1 {
+ err := fn()
+ if err == nil {
</file context>
| // no commit-time conflict check, so without this lock concurrent callers | ||
| // would each read the node and each report loaded=true. The lock makes | ||
| // read-and-consume atomic regardless of backend. | ||
| if c.consumeMu != nil { |
There was a problem hiding this comment.
P2: Global client-wide mutex consumeMu over-serializes LoadAndDelete across all keys and backends. The mutex is only needed for the embedded engine (which lacks commit-time conflict checks), but it is applied unconditionally, including for remote Dgraph clusters where transaction aborts already guarantee single-winner semantics. This creates unnecessary head-of-line blocking and throughput regression under concurrent consumers with different keys. Consider gating the lock to embedded mode, using a per-key/sharded lock, or shortening the critical section.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At client.go, line 722:
<comment>Global client-wide mutex `consumeMu` over-serializes `LoadAndDelete` across all keys and backends. The mutex is only needed for the embedded engine (which lacks commit-time conflict checks), but it is applied unconditionally, including for remote Dgraph clusters where transaction aborts already guarantee single-winner semantics. This creates unnecessary head-of-line blocking and throughput regression under concurrent consumers with different keys. Consider gating the lock to embedded mode, using a per-key/sharded lock, or shortening the critical section.</comment>
<file context>
@@ -528,9 +609,182 @@ func (c client) Upsert(ctx context.Context, obj any, predicates ...string) error
+ // no commit-time conflict check, so without this lock concurrent callers
+ // would each read the node and each report loaded=true. The lock makes
+ // read-and-consume atomic regardless of backend.
+ if c.consumeMu != nil {
+ c.consumeMu.Lock()
+ defer c.consumeMu.Unlock()
</file context>
| // field exists. | ||
| func firstUpsertPredicate(obj any) string { | ||
| v := reflect.ValueOf(obj) | ||
| for v.Kind() == reflect.Ptr { |
There was a problem hiding this comment.
P2: Nil pointer input can panic in firstUpsertPredicate via reflection dereferencing loop
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At client.go, line 648:
<comment>Nil pointer input can panic in `firstUpsertPredicate` via reflection dereferencing loop</comment>
<file context>
@@ -528,9 +609,182 @@ func (c client) Upsert(ctx context.Context, obj any, predicates ...string) error
+// field exists.
+func firstUpsertPredicate(obj any) string {
+ v := reflect.ValueOf(obj)
+ for v.Kind() == reflect.Ptr {
+ v = v.Elem()
+ }
</file context>
| // NodesAndCount executes the query and returns the matching records together | ||
| // with the total count (useful for pagination totals). Like Nodes, it runs the | ||
| // WhereEdge pre-pass first when edge constraints are present. | ||
| func (qb *Query[T]) NodesAndCount() ([]T, int, error) { |
There was a problem hiding this comment.
P2: NodesAndCount terminal is missing tracing instrumentation unlike other terminal methods
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At typed/query.go, line 388:
<comment>NodesAndCount terminal is missing tracing instrumentation unlike other terminal methods</comment>
<file context>
@@ -0,0 +1,565 @@
+// NodesAndCount executes the query and returns the matching records together
+// with the total count (useful for pagination totals). Like Nodes, it runs the
+// WhereEdge pre-pass first when edge constraints are present.
+func (qb *Query[T]) NodesAndCount() ([]T, int, error) {
+ matched, err := qb.resolveRoots()
+ if err != nil {
</file context>
| // reflection probe finds Unwrap(), calls it, gets an error, fails the | ||
| // Schema check, and returns the original obj. | ||
| func UnwrapSchema(obj any) any { | ||
| if obj == nil { |
There was a problem hiding this comment.
P2: Typed nil pointers are not guarded before calling Unwrap via reflection, which can panic if the method dereferences the nil receiver.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At record.go, line 35:
<comment>Typed nil pointers are not guarded before calling Unwrap via reflection, which can panic if the method dereferences the nil receiver.</comment>
<file context>
@@ -0,0 +1,58 @@
+// reflection probe finds Unwrap(), calls it, gets an error, fails the
+// Schema check, and returns the original obj.
+func UnwrapSchema(obj any) any {
+ if obj == nil {
+ return obj
+ }
</file context>
| t.Fatal("first store: want loaded=false (newly created)") | ||
| } | ||
|
|
||
| second := &consumeJTI{JTI: "abc"} |
There was a problem hiding this comment.
P2: TestLoadOrStore does not assert object hydration on the loaded=true path, so a regression where LoadOrStore returns loaded=true without populating the existing record's fields would pass silently.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At consume_test.go, line 44:
<comment>TestLoadOrStore does not assert object hydration on the loaded=true path, so a regression where LoadOrStore returns loaded=true without populating the existing record's fields would pass silently.</comment>
<file context>
@@ -0,0 +1,89 @@
+ t.Fatal("first store: want loaded=false (newly created)")
+ }
+
+ second := &consumeJTI{JTI: "abc"}
+ loaded, err = conn.LoadOrStore(ctx, second, "jti")
+ if err != nil {
</file context>
| } | ||
|
|
||
| callCount := 0 | ||
| err := client.WithRetry(ctx, slowPolicy, func() error { |
There was a problem hiding this comment.
P2: TestWithRetryContextCancellation claims to verify context cancellation during backoff sleep, but the callback returns ctx.Err() (not dgo.ErrAborted), so WithRetry returns immediately and never enters the backoff/retry loop. The select block handling ctx.Done() during delay is never exercised, providing false confidence.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At retry_test.go, line 115:
<comment>`TestWithRetryContextCancellation` claims to verify context cancellation during backoff sleep, but the callback returns `ctx.Err()` (not `dgo.ErrAborted`), so `WithRetry` returns immediately and never enters the backoff/retry loop. The `select` block handling `ctx.Done()` during delay is never exercised, providing false confidence.</comment>
<file context>
@@ -0,0 +1,197 @@
+ }
+
+ callCount := 0
+ err := client.WithRetry(ctx, slowPolicy, func() error {
+ callCount++
+ // Always return an error that looks like an abort to trigger retry.
</file context>
What
Two atomic key-value verbs mirroring
sync.Map, at both theClientinterface and the generictyped.Client[T]:LoadOrStore(ctx, obj, predicates...) (loaded bool, err error)— insert-if-absent.loaded=truemeans a node already matched the upsert predicates (andobjis populated from it). Built on dgmanMutateOrGet.LoadAndDelete(ctx, obj, key, predicates...) (loaded bool, err error)— atomic read-and-consume. Reads the node whose key predicate equalskeyintoobj, deletes it, and reports whether one was present.The typed layer returns
(*T, bool, error)and wraps each call in a tracing span like the existing methods.Why
These surface test-and-set semantics the existing idempotent verbs intentionally discard: "did I create this?" (replay / uniqueness detection) and "read-and-consume exactly once" (single-use tokens). They are the equivalents of SQL
INSERT … ON CONFLICT DO NOTHINGandDELETE … RETURNING.Implementation note / tradeoff
LoadAndDeleteruns read-then-delete in one non-CommitNowtransaction and retries on abort, so against a real Dgraph cluster concurrent consumers conflict and exactly one wins. The embedded file-backed engine performs no commit-time conflict check, so a per-client mutex additionally serializes the read-delete critical section to guarantee single-winner semantics in-process regardless of backend. The lock is coarse (one per client); a per-key/sharded lock, or gating it to embedded mode only, would reduce contention — happy to refine based on review.Tests
-race.Summary by cubic
Adds
LoadOrStore(insert-if-absent) andLoadAndDelete(atomic read-and-consume) toClientandtyped.Client[T]for idempotency and single-use token flows. Also introduces a generic typed layer with a fluent query builder, a retry policy for aborted transactions, and raw schema DDL helpers.Client.LoadOrStore,Client.LoadAndDelete; typed variants return(*T, bool, error)with single-winner semantics across backends.typed.Client[T],Query[T](filters, ordering, paging, edge traversal, iterators),MultiQuery,typed/filter, andtyped/search.MergeByID.RetryPolicyandClient.WithRetrywith exponential backoff and jitter on aborted transactions.Client.AlterSchema, embeddedDropAttrsupport, generated-type routing viaSchema/UnwrapSchema, andSelfValidatorfor per-type validation.typed.SetTracer(no-op by default) andWithGRPCDialOptionfor custom gRPC dial settings.Written for commit 4ad3512. Summary will update on new commits.