Skip to content

Commit 9ffab52

Browse files
eileenaaaShiva
andauthored
fix: fetch schema over network to ensure @unique check across groups (#9596)
**Description** This PR fixes an issue where the `@unique` constraint is bypassed in multi-group Dgraph environments. This occurs when a mutation is processed by an Alpha node that does not own the predicate, as the local schema state lacks the necessary metadata for "remote" predicates.(see #9565) Closes #9565 To address this, I've implemented an on-demand fetching mechanism: - **On-Demand Fetching**: If a predicate is missing from the local `schema.State`, the Alpha node now fetches the schema over the network from the owner group using `worker.GetSchemaOverNetwork`. - **Per-Request Cache**: Introduced a `repaired` map within `addQueryIfUnique` to cache these network lookups for the duration of the request. This prevents RPC amplification and ensures performance remains stable during large mutations. - **Scope Isolation**: The fetched metadata is only used for the current request context, preventing any potential stale data issues in the global schema state. This ensures that the `@unique` constraint is correctly validated cluster-wide, regardless of which group handles the mutation. **Checklist** - [x] The PR title follows the [Conventional Commits](https://www.conventionalcommits.org/en/v1.0.0/#summary) syntax, leading with `fix:`, `feat:`, `chore:`, `ci:`, etc. - [x] Code compiles correctly and linting (via trunk) passes locally - [x] Tests added for new functionality, or regression tests for bug fixes added as applicable --------- Co-authored-by: Shiva <shiva@Shivajis-MacBook-Pro.local>
1 parent da18664 commit 9ffab52

File tree

3 files changed

+230
-1
lines changed

3 files changed

+230
-1
lines changed

dgraphapi/cluster.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ type HTTPClient struct {
6363
dqlURL string
6464
dqlMutateUrl string
6565
alphaStateUrl string
66+
moveTabletURL string
6667
}
6768

6869
// GraphQLParams are used for making graphql requests to dgraph
@@ -696,6 +697,30 @@ func (hc *HTTPClient) Mutate(mutation string, commitNow bool) ([]byte, error) {
696697
return DoReq(req)
697698
}
698699

700+
func (hc *HTTPClient) MoveTablet(predicate string, group uint32) error {
701+
url := fmt.Sprintf("%s?tablet=%s&group=%d", hc.moveTabletURL, predicate, group)
702+
response, err := http.Get(url)
703+
if err != nil {
704+
return errors.Wrapf(err, "error moving tablet via HTTP: predicate=%s, group=%d", predicate, group)
705+
}
706+
defer func() {
707+
if err := response.Body.Close(); err != nil {
708+
log.Printf("[WARNING] error closing body: %v", err)
709+
}
710+
}()
711+
712+
body, err := io.ReadAll(response.Body)
713+
if err != nil {
714+
return errors.Wrapf(err, "error reading move tablet response body")
715+
}
716+
717+
if response.StatusCode != http.StatusOK {
718+
return errors.Errorf("move tablet failed with status %d: %s", response.StatusCode, string(body))
719+
}
720+
721+
return nil
722+
}
723+
699724
// SetupSchema sets up DQL schema
700725
func (gc *GrpcClient) SetupSchema(dbSchema string) error {
701726
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
@@ -784,9 +809,11 @@ func GetHttpClient(alphaUrl, zeroUrl string) (*HTTPClient, error) {
784809

785810
dqlUrl := "http://" + alphaUrl + "/query"
786811
dqlMutateUrl := "http://" + alphaUrl + "/mutate"
812+
moveTabletUrl := "http://" + zeroUrl + "/moveTablet"
787813
return &HTTPClient{
788814
adminURL: adminUrl,
789815
graphqlURL: graphQLUrl,
816+
moveTabletURL: moveTabletUrl,
790817
stateURL: stateUrl,
791818
dqlURL: dqlUrl,
792819
dqlMutateUrl: dqlMutateUrl,

edgraph/server.go

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1693,6 +1693,41 @@ func addQueryIfUnique(qctx context.Context, qc *queryContext) error {
16931693
}
16941694
isGalaxyQuery := x.IsRootNsOperation(ctx)
16951695

1696+
missingPreds := make(map[string]struct{})
1697+
for _, gmu := range qc.gmuList {
1698+
for _, pred := range gmu.Set {
1699+
currNs := namespace
1700+
if isGalaxyQuery {
1701+
currNs = pred.Namespace
1702+
}
1703+
if pred.Predicate == "dgraph.xid" {
1704+
continue
1705+
}
1706+
fullPred := x.NamespaceAttr(currNs, pred.Predicate)
1707+
if _, ok := schema.State().Get(ctx, fullPred); !ok {
1708+
missingPreds[fullPred] = struct{}{}
1709+
}
1710+
}
1711+
}
1712+
1713+
repaired := make(map[string]bool)
1714+
if len(missingPreds) > 0 {
1715+
predList := make([]string, 0, len(missingPreds))
1716+
for p := range missingPreds {
1717+
predList = append(predList, p)
1718+
}
1719+
1720+
schReq := &pb.SchemaRequest{Predicates: predList}
1721+
remoteNodes, err := worker.GetSchemaOverNetwork(ctx, schReq)
1722+
if err != nil {
1723+
return errors.Wrapf(err, "unique validation failed to fetch schema for predicates %v", predList)
1724+
}
1725+
1726+
for _, node := range remoteNodes {
1727+
repaired[node.Predicate] = node.Unique
1728+
}
1729+
}
1730+
16961731
qc.uniqueVars = map[uint64]uniquePredMeta{}
16971732
for gmuIndex, gmu := range qc.gmuList {
16981733
var buildQuery strings.Builder
@@ -1706,7 +1741,16 @@ func addQueryIfUnique(qctx context.Context, qc *queryContext) error {
17061741
// [TODO] Don't check if it's dgraph.xid. It's a bug as this node might not be aware
17071742
// of the schema for the given predicate. This is a bug issue for dgraph.xid hence
17081743
// we are bypassing it manually until the bug is fixed.
1709-
predSchema, ok := schema.State().Get(ctx, x.NamespaceAttr(namespace, pred.Predicate))
1744+
fullPred := x.NamespaceAttr(namespace, pred.Predicate)
1745+
predSchema, ok := schema.State().Get(ctx, fullPred)
1746+
if !ok {
1747+
u, found := repaired[fullPred]
1748+
if found {
1749+
predSchema.Unique = u
1750+
ok = true
1751+
}
1752+
}
1753+
17101754
if !ok || !predSchema.Unique {
17111755
continue
17121756
}
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
//go:build integration2
2+
3+
/*
4+
* SPDX-FileCopyrightText: © 2017-2025 Istari Digital, Inc.
5+
* SPDX-License-Identifier: Apache-2.0
6+
*/
7+
8+
package main
9+
10+
import (
11+
"strconv"
12+
"strings"
13+
"testing"
14+
"time"
15+
16+
"github.com/dgraph-io/dgo/v250/protos/api"
17+
"github.com/dgraph-io/dgraph/v25/dgraphtest"
18+
"github.com/dgraph-io/dgraph/v25/protos/pb"
19+
"github.com/stretchr/testify/require"
20+
"google.golang.org/protobuf/encoding/protojson"
21+
)
22+
23+
func TestUniqueMultipleGroups(t *testing.T) {
24+
conf := dgraphtest.NewClusterConfig().WithNumAlphas(2).WithNumZeros(1).WithReplicas(1)
25+
c, err := dgraphtest.NewLocalCluster(conf)
26+
require.NoError(t, err)
27+
defer func() { c.Cleanup(t.Failed()) }()
28+
require.NoError(t, c.Start())
29+
30+
httpClient, err := c.HTTPClient()
31+
require.NoError(t, err)
32+
33+
dg, cleanup, err := c.Client()
34+
require.NoError(t, err)
35+
defer cleanup()
36+
37+
require.NoError(t, dg.DropAll())
38+
39+
// Setup schema
40+
schema := `
41+
name: string @index(exact) .
42+
email_group_1: string @unique @index(exact) .
43+
email_group_2: string @unique @index(exact) .
44+
`
45+
require.NoError(t, dg.SetupSchema(schema))
46+
time.Sleep(2 * time.Second)
47+
48+
// Get the cluster state to find which alphas belong to which groups
49+
var state pb.MembershipState
50+
51+
getStateAndFindGroups := func() (uint32, uint32) {
52+
healthResp, err := httpClient.GetAlphaState()
53+
require.NoError(t, err)
54+
require.NoError(t, protojson.Unmarshal(healthResp, &state))
55+
t.Logf("Retrieved cluster state with %d groups", len(state.Groups))
56+
var e1, e2 uint32
57+
for groupID, group := range state.Groups {
58+
for predName := range group.Tablets {
59+
60+
if predName == "email_group_1" || predName == "0-email_group_1" {
61+
t.Logf("predName: %s", predName)
62+
t.Logf("groupID: %d", groupID)
63+
e1 = groupID
64+
}
65+
if predName == "email_group_2" || predName == "0-email_group_2" {
66+
t.Logf("predName: %s", predName)
67+
t.Logf("groupID: %d", groupID)
68+
e2 = groupID
69+
}
70+
}
71+
}
72+
return e1, e2
73+
}
74+
75+
email1GroupID, email2GroupID := getStateAndFindGroups()
76+
require.NotZero(t, email1GroupID, "email_group_1 predicate should be assigned to a group")
77+
require.NotZero(t, email2GroupID, "email_group_2 predicate should be assigned to a group")
78+
79+
// If both predicates are in the same group, move one to a different group
80+
if email1GroupID == email2GroupID {
81+
var targetGroupID uint32
82+
for gid := range state.Groups {
83+
if gid != email1GroupID && gid > 0 {
84+
targetGroupID = gid
85+
break
86+
}
87+
}
88+
require.NotZero(t, targetGroupID, "need at least two groups to move predicate")
89+
err = httpClient.MoveTablet("email_group_1", targetGroupID)
90+
if err != nil {
91+
require.Contains(t, err.Error(), "already being served by group")
92+
}
93+
t.Logf("Moved email_group_1 predicate to group %d (was in same group as email_group_2)", targetGroupID)
94+
time.Sleep(2 * time.Second)
95+
email1GroupID, email2GroupID = getStateAndFindGroups()
96+
}
97+
98+
require.NotEqual(t, email1GroupID, email2GroupID, "email_group_1 and email_group_2 should be in different groups")
99+
100+
t.Logf("email_group_1 is in group %d, email_group_2 is in group %d", email1GroupID, email2GroupID)
101+
102+
// Helper to find an alpha index in a given group
103+
findAlphaInGroup := func(gid uint32) int {
104+
if group, ok := state.Groups[gid]; ok {
105+
for _, member := range group.Members {
106+
addr := member.Addr
107+
if strings.HasPrefix(addr, "alpha") {
108+
alphaNumStr := strings.TrimPrefix(addr, "alpha")
109+
alphaNumStr = strings.Split(alphaNumStr, ":")[0]
110+
alphaNum, err := strconv.Atoi(alphaNumStr)
111+
if err == nil && alphaNum >= 0 {
112+
return alphaNum
113+
}
114+
}
115+
}
116+
}
117+
return -1
118+
}
119+
alphaForEmailGroup1 := findAlphaInGroup(email1GroupID)
120+
alphaForEmailGroup2 := findAlphaInGroup(email2GroupID)
121+
require.GreaterOrEqual(t, alphaForEmailGroup1, 0, "should find an alpha serving email_group_1")
122+
require.GreaterOrEqual(t, alphaForEmailGroup2, 0, "should find an alpha serving email_group_2")
123+
124+
// Insert initial values for both predicates using default client
125+
_, err = dg.Mutate(&api.Mutation{
126+
SetNquads: []byte(`<0x100> <name> "User One" .
127+
<0x100> <email_group_1> "dup@example.com" .
128+
<0x100> <email_group_2> "other@example.com" .`),
129+
CommitNow: true,
130+
})
131+
require.NoError(t, err)
132+
133+
// Try to insert duplicate into email1 from an alpha that does NOT serve email1 (i.e. from email2's group)
134+
dgNotEmail1, cleanup1, err := c.AlphaClient(alphaForEmailGroup2)
135+
require.NoError(t, err)
136+
defer cleanup1()
137+
rdfDupEmail1 := `<0x200> <name> "User Two" .
138+
<0x200> <email_group_1> "dup@example.com" .`
139+
_, err = dgNotEmail1.Mutate(&api.Mutation{
140+
SetNquads: []byte(rdfDupEmail1),
141+
CommitNow: true,
142+
})
143+
require.Error(t, err)
144+
require.ErrorContains(t, err, "could not insert duplicate value")
145+
146+
// Try to insert duplicate into email2 from an alpha that does NOT serve email2 (i.e. from email1's group)
147+
dgNotEmail2, cleanup2, err := c.AlphaClient(alphaForEmailGroup1)
148+
require.NoError(t, err)
149+
defer cleanup2()
150+
rdfDupEmail2 := `<0x300> <name> "User Three" .
151+
<0x300> <email_group_2> "other@example.com" .`
152+
_, err = dgNotEmail2.Mutate(&api.Mutation{
153+
SetNquads: []byte(rdfDupEmail2),
154+
CommitNow: true,
155+
})
156+
require.Error(t, err)
157+
require.ErrorContains(t, err, "could not insert duplicate value")
158+
}

0 commit comments

Comments
 (0)