Skip to content

Add LoadOrStore and LoadAndDelete (atomic insert-if-absent and read-and-consume)#26

Open
mlwelles wants to merge 21 commits into
matthewmcneely:mainfrom
mlwelles:feature/load-and-delete-load-or-store
Open

Add LoadOrStore and LoadAndDelete (atomic insert-if-absent and read-and-consume)#26
mlwelles wants to merge 21 commits into
matthewmcneely:mainfrom
mlwelles:feature/load-and-delete-load-or-store

Conversation

@mlwelles

@mlwelles mlwelles commented Jun 8, 2026

Copy link
Copy Markdown

What

Two atomic key-value verbs mirroring sync.Map, at both the Client interface and the generic typed.Client[T]:

  • LoadOrStore(ctx, obj, predicates...) (loaded bool, err error) — insert-if-absent. loaded=true means a node already matched the upsert predicates (and obj is populated from it). Built on dgman MutateOrGet.
  • LoadAndDelete(ctx, obj, key, predicates...) (loaded bool, err error) — atomic read-and-consume. Reads the node whose key predicate equals key into obj, deletes it, and reports whether one was present.

The typed layer returns (*T, bool, error) and wraps each call in a tracing span like the existing methods.

Why

These surface test-and-set semantics the existing idempotent verbs intentionally discard: "did I create this?" (replay / uniqueness detection) and "read-and-consume exactly once" (single-use tokens). They are the equivalents of SQL INSERT … ON CONFLICT DO NOTHING and DELETE … RETURNING.

Implementation note / tradeoff

LoadAndDelete runs read-then-delete in one non-CommitNow transaction and retries on abort, so against a real Dgraph cluster concurrent consumers conflict and exactly one wins. The embedded file-backed engine performs no commit-time conflict check, so a per-client mutex additionally serializes the read-delete critical section to guarantee single-winner semantics in-process regardless of backend. The lock is coarse (one per client); a per-key/sharded lock, or gating it to embedded mode only, would reduce contention — happy to refine based on review.

Tests

  • Substrate and typed tests for both verbs (insert-if-absent, read-and-consume, single-use).
  • A concurrency test asserting exactly one winner under 8 racers, passing under -race.

Summary by cubic

Adds LoadOrStore (insert-if-absent) and LoadAndDelete (atomic read-and-consume) to Client and typed.Client[T] for idempotency and single-use token flows. Also introduces a generic typed layer with a fluent query builder, a retry policy for aborted transactions, and raw schema DDL helpers.

  • New Features
    • Atomic verbs: Client.LoadOrStore, Client.LoadAndDelete; typed variants return (*T, bool, error) with single-winner semantics across backends.
    • Typed layer: typed.Client[T], Query[T] (filters, ordering, paging, edge traversal, iterators), MultiQuery, typed/filter, and typed/search.MergeByID.
    • Retry: RetryPolicy and Client.WithRetry with exponential backoff and jitter on aborted transactions.
    • Schema and validation: Client.AlterSchema, embedded DropAttr support, generated-type routing via Schema/UnwrapSchema, and SelfValidator for per-type validation.
    • Tracing and connectivity: typed.SetTracer (no-op by default) and WithGRPCDialOption for custom gRPC dial settings.

Written for commit 4ad3512. Summary will update on new commits.

Review in cubic

mlwelles added 21 commits June 4, 2026 13:06
Add a generic typed layer over modusgraph.Client: typed.Client[T] with
CRUD and iterators; a fluent Query[T] builder (filters, ordering, paging,
edge traversal, IterNodes); MultiQuery for N homogeneous blocks in one
round-trip; functional options; a filter DSL (typed/filter); and ordered
result merging (typed/search).

A small no-op-by-default Tracer seam (typed.SetTracer) lets a host plug in
tracing without the typed package depending on any telemetry library.

Self-contained: builds and tests against the current client with no other
changes.
Add RetryPolicy / DefaultRetryPolicy and a runner that re-executes a
function on aborted Dgraph transactions with exponential backoff
(retry.go), exposed on the client via a WithRetry method.
Add the Schema interface (SchemaTypeName), the UnwrapSchema reflection
helper, and the DgraphMapper interface (record.go). The client unwraps
schema-defining values at the mutation and query boundary so generated
wrapper types route to their backing schema struct. Plain structs do not
implement Schema and are unaffected — UnwrapSchema is identity for them.
The unit-test job runs `go test -short`, which skips every test that needs
a live Dgraph. Standing up a dgraph/standalone container (and setting
MODUSGRAPH_TEST_ADDR) therefore adds setup the job never uses. Remove both;
the integration and load suites keep their own dedicated jobs.
Add common local artifacts to .gitignore: editor config (.idea/, .vscode/),
the built ./query binary, load_test benchmark JSON, and git worktrees.
Adds WithGRPCDialOption(opt grpc.DialOption), a general escape hatch for
gRPC dial settings the dedicated options do not cover — TLS transport
credentials, interceptors, keepalive, and so on — on remote (dgraph://)
connections.

The existing WithMaxRecvMsgSize is folded into the same dial-option
assembly, so the two compose cleanly, and the client dedup key now counts
the custom dial options so differently-configured clients are not merged.
No change for embedded (file://) URIs.
Adds raw schema-DDL primitives that complement UpdateSchema's
object-template inference:

- Client.AlterSchema(ctx, schema) applies a raw DQL schema string directly,
  giving full control over predicate types, indexes, and directives — useful
  for migrations that declare predicates no Go type models yet.
- Engine.dropPredicate deletes a single predicate (and its data) from the
  embedded engine via posting.DeletePredicate.
- embedded_client.go routes an Alter carrying DropAttr to dropPredicate, so
  the embedded path matches a remote Dgraph cluster's DropAttr behavior.

TestDropPredicateEmbedded exercises the full declare/insert/drop cycle
against the embedded engine.
Adds SelfValidator, an opt-in seam that lets a type drive its own
validation. When a value passed to Insert, Upsert, or Update implements
SelfValidator, the client calls ValidateWith instead of handing the value
straight to the configured StructValidator.

validateStruct now routes each element through a new validateOne helper that
detects SelfValidator (on the value or its address) and otherwise falls back
to StructCtx exactly as before — behavior is unchanged for ordinary structs.

This is the runtime seam generated entities use to validate unexported
fields: the generated ValidateWith builds a mirror struct with exported
fields the go-playground validator can read by reflection.
Add TestLoadAndDeleteSingleWinner and serialize LoadAndDelete's
read-then-delete critical section with a per-client mutex so exactly one
in-process caller consumes a node. The embedded engine's commit path does no
optimistic-concurrency conflict check, so the shared read-write transaction
alone cannot abort losers; the lock guarantees single-winner semantics
regardless of backend.
@mlwelles mlwelles requested a review from matthewmcneely as a code owner June 8, 2026 05:28

@cubic-dev-ai cubic-dev-ai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

10 issues found across 31 files

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="self_validator_test.go">

<violation number="1" location="self_validator_test.go:57">
P2: Slice self-validator test does not assert that `StructCtx` was never called, unlike the analogous scalar test `TestValidateRoutesToSelfValidator`. A regression where slice handling incorrectly invokes both paths would still pass.</violation>
</file>

<file name="dial_options_test.go">

<violation number="1" location="dial_options_test.go:23">
P2: TestKeyDistinguishesGRPCDialOptions is too weak: it only verifies empty-vs-non-empty gRPC dial options and would not catch collisions between differently configured clients that have the same option count.</violation>
</file>

<file name="retry.go">

<violation number="1" location="retry.go:77">
P2: Negative `MaxRetries` can skip the loop and hit the supposedly unreachable `panic("unreachable")`. When `MaxRetries` is `-1`, `range policy.MaxRetries + 1` becomes `range 0`, which executes zero iterations. Execution falls through to the `panic` instead of calling `fn()` or returning an error.</violation>
</file>

<file name="client.go">

<violation number="1" location="client.go:497">
P1: Client cache key can collide for different gRPC dial option sets because it keys on option count only</violation>

<violation number="2" location="client.go:648">
P2: Nil pointer input can panic in `firstUpsertPredicate` via reflection dereferencing loop</violation>

<violation number="3" location="client.go:722">
P2: Global client-wide mutex `consumeMu` over-serializes `LoadAndDelete` across all keys and backends. The mutex is only needed for the embedded engine (which lacks commit-time conflict checks), but it is applied unconditionally, including for remote Dgraph clusters where transaction aborts already guarantee single-winner semantics. This creates unnecessary head-of-line blocking and throughput regression under concurrent consumers with different keys. Consider gating the lock to embedded mode, using a per-key/sharded lock, or shortening the critical section.</violation>
</file>

<file name="typed/query.go">

<violation number="1" location="typed/query.go:388">
P2: NodesAndCount terminal is missing tracing instrumentation unlike other terminal methods</violation>
</file>

<file name="record.go">

<violation number="1" location="record.go:35">
P2: Typed nil pointers are not guarded before calling Unwrap via reflection, which can panic if the method dereferences the nil receiver.</violation>
</file>

<file name="consume_test.go">

<violation number="1" location="consume_test.go:44">
P2: TestLoadOrStore does not assert object hydration on the loaded=true path, so a regression where LoadOrStore returns loaded=true without populating the existing record's fields would pass silently.</violation>
</file>

<file name="retry_test.go">

<violation number="1" location="retry_test.go:115">
P2: `TestWithRetryContextCancellation` claims to verify context cancellation during backoff sleep, but the callback returns `ctx.Err()` (not `dgo.ErrAborted`), so `WithRetry` returns immediately and never enters the backoff/retry loop. The `select` block handling `ctx.Done()` during delay is never exercised, providing false confidence.</violation>
</file>

Tip: cubic can generate docs of your entire codebase and keep them up to date. Try it here.

Re-trigger cubic

Comment thread client.go
return fmt.Sprintf("%s:%t:%d:%d:%d:%d:%s:%s:%s:%d", c.uri, c.options.autoSchema, c.options.poolSize,
c.options.maxEdgeTraversal, c.options.cacheSizeMB, c.options.maxRecvMsgSize,
c.options.namespace, validatorKey, embeddingKey)
c.options.namespace, validatorKey, embeddingKey, len(c.options.grpcDialOptions))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: Client cache key can collide for different gRPC dial option sets because it keys on option count only

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At client.go, line 497:

<comment>Client cache key can collide for different gRPC dial option sets because it keys on option count only</comment>

<file context>
@@ -430,9 +492,9 @@ func (c client) key() string {
+	return fmt.Sprintf("%s:%t:%d:%d:%d:%d:%s:%s:%s:%d", c.uri, c.options.autoSchema, c.options.poolSize,
 		c.options.maxEdgeTraversal, c.options.cacheSizeMB, c.options.maxRecvMsgSize,
-		c.options.namespace, validatorKey, embeddingKey)
+		c.options.namespace, validatorKey, embeddingKey, len(c.options.grpcDialOptions))
 }
 
</file context>

Comment thread self_validator_test.go
@@ -0,0 +1,65 @@
/*

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Slice self-validator test does not assert that StructCtx was never called, unlike the analogous scalar test TestValidateRoutesToSelfValidator. A regression where slice handling incorrectly invokes both paths would still pass.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At self_validator_test.go, line 57:

<comment>Slice self-validator test does not assert that `StructCtx` was never called, unlike the analogous scalar test `TestValidateRoutesToSelfValidator`. A regression where slice handling incorrectly invokes both paths would still pass.</comment>

<file context>
@@ -0,0 +1,65 @@
+	}
+}
+
+func TestValidateSelfValidatorInSlice(t *testing.T) {
+	rv := &recordingValidator{}
+	c := client{options: clientOptions{validator: rv}}
</file context>

Comment thread dial_options_test.go
}
}

func TestKeyDistinguishesGRPCDialOptions(t *testing.T) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: TestKeyDistinguishesGRPCDialOptions is too weak: it only verifies empty-vs-non-empty gRPC dial options and would not catch collisions between differently configured clients that have the same option count.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At dial_options_test.go, line 23:

<comment>TestKeyDistinguishesGRPCDialOptions is too weak: it only verifies empty-vs-non-empty gRPC dial options and would not catch collisions between differently configured clients that have the same option count.</comment>

<file context>
@@ -0,0 +1,30 @@
+	}
+}
+
+func TestKeyDistinguishesGRPCDialOptions(t *testing.T) {
+	base := client{uri: "dgraph://localhost:9080"}
+	withOpt := client{uri: "dgraph://localhost:9080"}
</file context>

Comment thread retry.go
// return client.Insert(ctx, &entity)
// })
func (c client) WithRetry(ctx context.Context, policy RetryPolicy, fn func() error) error {
for attempt := range policy.MaxRetries + 1 {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Negative MaxRetries can skip the loop and hit the supposedly unreachable panic("unreachable"). When MaxRetries is -1, range policy.MaxRetries + 1 becomes range 0, which executes zero iterations. Execution falls through to the panic instead of calling fn() or returning an error.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At retry.go, line 77:

<comment>Negative `MaxRetries` can skip the loop and hit the supposedly unreachable `panic("unreachable")`. When `MaxRetries` is `-1`, `range policy.MaxRetries + 1` becomes `range 0`, which executes zero iterations. Execution falls through to the `panic` instead of calling `fn()` or returning an error.</comment>

<file context>
@@ -0,0 +1,96 @@
+//	    return client.Insert(ctx, &entity)
+//	})
+func (c client) WithRetry(ctx context.Context, policy RetryPolicy, fn func() error) error {
+	for attempt := range policy.MaxRetries + 1 {
+		err := fn()
+		if err == nil {
</file context>

Comment thread client.go
// no commit-time conflict check, so without this lock concurrent callers
// would each read the node and each report loaded=true. The lock makes
// read-and-consume atomic regardless of backend.
if c.consumeMu != nil {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Global client-wide mutex consumeMu over-serializes LoadAndDelete across all keys and backends. The mutex is only needed for the embedded engine (which lacks commit-time conflict checks), but it is applied unconditionally, including for remote Dgraph clusters where transaction aborts already guarantee single-winner semantics. This creates unnecessary head-of-line blocking and throughput regression under concurrent consumers with different keys. Consider gating the lock to embedded mode, using a per-key/sharded lock, or shortening the critical section.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At client.go, line 722:

<comment>Global client-wide mutex `consumeMu` over-serializes `LoadAndDelete` across all keys and backends. The mutex is only needed for the embedded engine (which lacks commit-time conflict checks), but it is applied unconditionally, including for remote Dgraph clusters where transaction aborts already guarantee single-winner semantics. This creates unnecessary head-of-line blocking and throughput regression under concurrent consumers with different keys. Consider gating the lock to embedded mode, using a per-key/sharded lock, or shortening the critical section.</comment>

<file context>
@@ -528,9 +609,182 @@ func (c client) Upsert(ctx context.Context, obj any, predicates ...string) error
+	// no commit-time conflict check, so without this lock concurrent callers
+	// would each read the node and each report loaded=true. The lock makes
+	// read-and-consume atomic regardless of backend.
+	if c.consumeMu != nil {
+		c.consumeMu.Lock()
+		defer c.consumeMu.Unlock()
</file context>

Comment thread client.go
// field exists.
func firstUpsertPredicate(obj any) string {
v := reflect.ValueOf(obj)
for v.Kind() == reflect.Ptr {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Nil pointer input can panic in firstUpsertPredicate via reflection dereferencing loop

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At client.go, line 648:

<comment>Nil pointer input can panic in `firstUpsertPredicate` via reflection dereferencing loop</comment>

<file context>
@@ -528,9 +609,182 @@ func (c client) Upsert(ctx context.Context, obj any, predicates ...string) error
+// field exists.
+func firstUpsertPredicate(obj any) string {
+	v := reflect.ValueOf(obj)
+	for v.Kind() == reflect.Ptr {
+		v = v.Elem()
+	}
</file context>

Comment thread typed/query.go
// NodesAndCount executes the query and returns the matching records together
// with the total count (useful for pagination totals). Like Nodes, it runs the
// WhereEdge pre-pass first when edge constraints are present.
func (qb *Query[T]) NodesAndCount() ([]T, int, error) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: NodesAndCount terminal is missing tracing instrumentation unlike other terminal methods

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At typed/query.go, line 388:

<comment>NodesAndCount terminal is missing tracing instrumentation unlike other terminal methods</comment>

<file context>
@@ -0,0 +1,565 @@
+// NodesAndCount executes the query and returns the matching records together
+// with the total count (useful for pagination totals). Like Nodes, it runs the
+// WhereEdge pre-pass first when edge constraints are present.
+func (qb *Query[T]) NodesAndCount() ([]T, int, error) {
+	matched, err := qb.resolveRoots()
+	if err != nil {
</file context>

Comment thread record.go
// reflection probe finds Unwrap(), calls it, gets an error, fails the
// Schema check, and returns the original obj.
func UnwrapSchema(obj any) any {
if obj == nil {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Typed nil pointers are not guarded before calling Unwrap via reflection, which can panic if the method dereferences the nil receiver.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At record.go, line 35:

<comment>Typed nil pointers are not guarded before calling Unwrap via reflection, which can panic if the method dereferences the nil receiver.</comment>

<file context>
@@ -0,0 +1,58 @@
+// reflection probe finds Unwrap(), calls it, gets an error, fails the
+// Schema check, and returns the original obj.
+func UnwrapSchema(obj any) any {
+	if obj == nil {
+		return obj
+	}
</file context>

Comment thread consume_test.go
t.Fatal("first store: want loaded=false (newly created)")
}

second := &consumeJTI{JTI: "abc"}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: TestLoadOrStore does not assert object hydration on the loaded=true path, so a regression where LoadOrStore returns loaded=true without populating the existing record's fields would pass silently.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At consume_test.go, line 44:

<comment>TestLoadOrStore does not assert object hydration on the loaded=true path, so a regression where LoadOrStore returns loaded=true without populating the existing record's fields would pass silently.</comment>

<file context>
@@ -0,0 +1,89 @@
+		t.Fatal("first store: want loaded=false (newly created)")
+	}
+
+	second := &consumeJTI{JTI: "abc"}
+	loaded, err = conn.LoadOrStore(ctx, second, "jti")
+	if err != nil {
</file context>

Comment thread retry_test.go
}

callCount := 0
err := client.WithRetry(ctx, slowPolicy, func() error {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: TestWithRetryContextCancellation claims to verify context cancellation during backoff sleep, but the callback returns ctx.Err() (not dgo.ErrAborted), so WithRetry returns immediately and never enters the backoff/retry loop. The select block handling ctx.Done() during delay is never exercised, providing false confidence.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At retry_test.go, line 115:

<comment>`TestWithRetryContextCancellation` claims to verify context cancellation during backoff sleep, but the callback returns `ctx.Err()` (not `dgo.ErrAborted`), so `WithRetry` returns immediately and never enters the backoff/retry loop. The `select` block handling `ctx.Done()` during delay is never exercised, providing false confidence.</comment>

<file context>
@@ -0,0 +1,197 @@
+	}
+
+	callCount := 0
+	err := client.WithRetry(ctx, slowPolicy, func() error {
+		callCount++
+		// Always return an error that looks like an abort to trigger retry.
</file context>

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant