diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java index c335d50f0a..d6fbe97964 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java @@ -19,8 +19,11 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import org.apache.hugegraph.HugeGraphParams; @@ -33,6 +36,7 @@ import org.apache.hugegraph.event.EventListener; import org.apache.hugegraph.meta.MetaDriver; import org.apache.hugegraph.meta.MetaManager; +import org.apache.hugegraph.meta.MetaManager.SchemaCacheClearEvent; import org.apache.hugegraph.perf.PerfUtil; import org.apache.hugegraph.schema.SchemaElement; import org.apache.hugegraph.type.HugeType; @@ -43,6 +47,29 @@ public class CachedSchemaTransactionV2 extends SchemaTransactionV2 { + private static final String ID_CACHE_PREFIX = "schema-id"; + private static final String NAME_CACHE_PREFIX = "schema-name"; + + // MetaDriver doesn't expose unlisten, register the meta listener once. + // Lifecycle: this JVM-global flag is intentionally never reset by + // unlistenChanges() (the underlying gRPC watch is process-wide). If that + // watch is silently dropped after a transport reconnect, recovery is not + // automatic; resetMetaListenerForReconnect() is only a manual hook to let + // the next schema operation install a fresh watch. + private static final AtomicBoolean metaEventListenerRegistered = + new AtomicBoolean(false); + + private static final Object META_LISTENER_LOCK = new Object(); + + /** + * Per-JVM identifier emitted with every schema-cache-clear meta event so + * the listener can skip its own echo. Lifecycle: generated once per + * classloader at class init, never reused, regenerated on JVM restart. + * This is not a stable node identity, only a local self-echo filter. + */ + private static final String SCHEMA_CACHE_CLEAR_SOURCE = + UUID.randomUUID().toString(); + private final Cache idCache; private final Cache nameCache; @@ -58,8 +85,8 @@ public CachedSchemaTransactionV2(MetaDriver metaDriver, final long capacity = graphParams.configuration() .get(CoreOptions.SCHEMA_CACHE_CAPACITY); - this.idCache = this.cache("schema-id", capacity); - this.nameCache = this.cache("schema-name", capacity); + this.idCache = this.cache(ID_CACHE_PREFIX, capacity); + this.nameCache = this.cache(NAME_CACHE_PREFIX, capacity); SchemaCaches attachment = this.idCache.attachment(); if (attachment == null) { @@ -86,11 +113,38 @@ public void close() { } private Cache cache(String prefix, long capacity) { - final String name = prefix + "-" + this.graph().spaceGraphName(); + final String name = cacheName(prefix, this.graph().spaceGraphName()); // NOTE: must disable schema cache-expire due to getAllSchema() return CacheManager.instance().cache(name, capacity); } + private static String cacheName(String prefix, String spaceGraphName) { + return prefix + "-" + spaceGraphName; + } + + private static void clearSchemaCache(String spaceGraphName) { + Map> caches = CacheManager.instance().caches(); + + // Clear name cache first so the (name -> id -> object) lookup path + // fails fast instead of returning a stale object backed by an + // already-empty id cache during the TOCTOU window. + Cache nameCache = caches.get(cacheName(NAME_CACHE_PREFIX, + spaceGraphName)); + if (nameCache != null) { + nameCache.clear(); + } + + Cache idCache = caches.get(cacheName(ID_CACHE_PREFIX, + spaceGraphName)); + if (idCache != null) { + SchemaCaches arrayCaches = idCache.attachment(); + if (arrayCaches != null) { + arrayCaches.clear(); + } + idCache.clear(); + } + } + private void listenChanges() { // Listen store event: "store.init", "store.clear", ... Set storeEvents = ImmutableSet.of(Events.STORE_INIT, @@ -100,7 +154,8 @@ private void listenChanges() { if (storeEvents.contains(event.name())) { LOG.debug("Graph {} clear schema cache on event '{}'", this.graph(), event.name()); - this.clearCache(true); + boolean notify = !Events.STORE_INIT.equals(event.name()); + this.clearCache(notify); return true; } return false; @@ -145,12 +200,88 @@ private void listenChanges() { if (!schemaEventHub.containsListener(Events.CACHE)) { schemaEventHub.listen(Events.CACHE, this.cacheEventListener); } + + listenSchemaCacheClear(); + } + + private static void listenSchemaCacheClear() { + synchronized (META_LISTENER_LOCK) { + if (metaEventListenerRegistered.get()) { + return; + } + try { + MetaManager.instance().listenSchemaCacheClear( + CachedSchemaTransactionV2::handleSchemaCacheClearEvent); + // Set AFTER the underlying watch is live so a concurrent + // caller that observes the flag is guaranteed an active + // subscription, and a failure leaves the flag false so the + // next caller retries registration. + metaEventListenerRegistered.set(true); + } catch (Exception e) { + throw e instanceof RuntimeException + ? (RuntimeException) e + : new RuntimeException( + "Failed to register schema cache clear listener", + e); + } + } + } + + /** + * Consumer invoked by the MetaManager schema-cache-clear watch. Extracted + * as a package-private static method so end-to-end tests can drive the + * publish -> callback -> {@link #clearSchemaCache(String)} path without + * depending on a live etcd/PD watch. + */ + static void handleSchemaCacheClearEvent(T response) { + List events = + MetaManager.instance() + .extractSchemaCacheClearEventsFromResponse( + response); + if (events == null) { + return; + } + for (SchemaCacheClearEvent event : events) { + if (SCHEMA_CACHE_CLEAR_SOURCE.equals(event.source())) { + continue; + } + String graphName = event.graph(); + LOG.debug("Graph {} clear schema cache on meta event", graphName); + clearSchemaCache(graphName); + } + } + + /** + * Manually reset the JVM-global meta listener flag after detecting that + * the MetaManager transport reconnected and dropped the underlying gRPC + * watch. This method is not wired to a MetaManager/MetaDriver reconnect + * callback today; callers must invoke it explicitly after detecting that + * condition. Without such a manual reset {@link #metaEventListenerRegistered} + * would stay {@code true} forever and this JVM would stop receiving + * cross-node schema cache clear events with no error or warning. + * + *

TODO: wire this into MetaManager once it exposes a transport + * reconnect callback (e.g. {@code listenReconnect} / + * {@code onTransportReconnect}). Until then it must be invoked + * explicitly by code that detects the reconnect. + */ + public static void resetMetaListenerForReconnect() { + if (metaEventListenerRegistered.compareAndSet(true, false)) { + LOG.warn("Schema cache clear meta listener lost on reconnect - " + + "will re-register on next schema operation."); + } } public void clearCache(boolean notify) { - this.idCache.clear(); + // Same TOCTOU ordering as clearSchemaCache(String): clear nameCache + // first, then the array attachment, then idCache last. this.nameCache.clear(); this.arrayCaches.clear(); + this.idCache.clear(); + + if (notify) { + this.maybeNotifySchemaCacheClear(); + } } private void resetCachedAllIfReachedCapacity() { @@ -202,6 +333,8 @@ protected void updateSchema(SchemaElement schema, super.updateSchema(schema, updateCallback); this.updateCache(schema); + // Status transitions are internal bookkeeping; notifying here causes a + // broadcast storm for every updateSchemaStatus() call from background jobs. } @Override @@ -210,11 +343,9 @@ protected void addSchema(SchemaElement schema) { this.updateCache(schema); - if (!this.graph().option(CoreOptions.TASK_SYNC_DELETION)) { - MetaManager.instance() - .notifySchemaCacheClear(this.graph().graphSpace(), - this.graph().name()); - } + // Schema additions must always propagate to remote nodes regardless + // of TASK_SYNC_DELETION (which only gates removal flows). + this.notifySchemaCacheClear(); } private void updateCache(SchemaElement schema) { @@ -238,13 +369,25 @@ public void removeSchema(SchemaElement schema) { this.invalidateCache(schema.type(), schema.id()); + this.maybeNotifySchemaCacheClear(); + } + + private void maybeNotifySchemaCacheClear() { + // Only suppress notifications for removal tasks when + // TASK_SYNC_DELETION=true: the caller propagates cache invalidation + // synchronously, so the meta-event broadcast would be redundant. if (!this.graph().option(CoreOptions.TASK_SYNC_DELETION)) { - MetaManager.instance() - .notifySchemaCacheClear(this.graph().graphSpace(), - this.graph().name()); + this.notifySchemaCacheClear(); } } + private void notifySchemaCacheClear() { + MetaManager.instance() + .notifySchemaCacheClear(this.graph().graphSpace(), + this.graph().name(), + SCHEMA_CACHE_CLEAR_SOURCE); + } + @Override @SuppressWarnings("unchecked") protected T getSchema(HugeType type, Id id) { diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/meta/MetaManager.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/meta/MetaManager.java index 551b21997e..6637baf22c 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/meta/MetaManager.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/meta/MetaManager.java @@ -57,11 +57,16 @@ import org.apache.hugegraph.space.SchemaTemplate; import org.apache.hugegraph.space.Service; import org.apache.hugegraph.util.E; +import org.apache.hugegraph.util.JsonUtil; +import org.apache.hugegraph.util.Log; +import org.slf4j.Logger; import com.google.common.collect.ImmutableMap; public class MetaManager { + private static final Logger LOG = Log.logger(MetaManager.class); + public static final String META_PATH_DELIMITER = "/"; public static final String META_PATH_JOIN = "-"; @@ -119,6 +124,8 @@ public class MetaManager { public static final long LOCK_DEFAULT_LEASE = 30L; public static final long LOCK_DEFAULT_TIMEOUT = 10L; public static final int RANDOM_USER_ID = 100; + private static final String SCHEMA_CACHE_CLEAR_GRAPH_KEY = "graph"; + private static final String SCHEMA_CACHE_CLEAR_SOURCE_KEY = "source"; private static final String META_PATH_URLS = "URLS"; private static final String META_PATH_PD_PEERS = "HSTORE_PD_PEERS"; private static final MetaManager INSTANCE = new MetaManager(); @@ -380,6 +387,23 @@ public List extractGraphsFromResponse(T response) { return this.metaDriver.extractValuesFromResponse(response); } + public List extractSchemaCacheClearEventsFromResponse( + T response) { + List values = this.metaDriver.extractValuesFromResponse(response); + if (values == null) { + return null; + } + + List events = new ArrayList<>(values.size()); + for (String value : values) { + SchemaCacheClearEvent event = SchemaCacheClearEvent.fromValue(value); + if (event != null) { + events.add(event); + } + } + return events; + } + public Map extractKVFromResponse(T response) { return this.metaDriver.extractKVFromResponse(response); } @@ -499,7 +523,12 @@ public void notifyGraphClear(String graphSpace, String graph) { } public void notifySchemaCacheClear(String graphSpace, String graph) { - this.graphMetaManager.notifySchemaCacheClear(graphSpace, graph); + this.notifySchemaCacheClear(graphSpace, graph, null); + } + + public void notifySchemaCacheClear(String graphSpace, String graph, + String source) { + this.graphMetaManager.notifySchemaCacheClear(graphSpace, graph, source); } public void notifyGraphCacheClear(String graphSpace, String graph) { @@ -1287,6 +1316,70 @@ public void setWhiteIpStatus(boolean status) { this.metaDriver.put(key, ((Boolean) status).toString()); } + public static String schemaCacheClearEventValue(String graph, + String source) { + if (StringUtils.isEmpty(source)) { + return graph; + } + return JsonUtil.toJson(ImmutableMap.of(SCHEMA_CACHE_CLEAR_GRAPH_KEY, + graph, + SCHEMA_CACHE_CLEAR_SOURCE_KEY, + source)); + } + + public static final class SchemaCacheClearEvent { + + private final String graph; + private final String source; + + private SchemaCacheClearEvent(String graph, String source) { + this.graph = graph; + this.source = source; + } + + public String graph() { + return this.graph; + } + + public String source() { + return this.source; + } + + @SuppressWarnings("unchecked") + static SchemaCacheClearEvent fromValue(String value) { + if (StringUtils.isEmpty(value)) { + return null; + } + // Compatibility: events published before source-id support stored + // only the graph name as a plain string. Keep accepting that format + // so mixed-version clusters can consume old/new schema-cache-clear + // events during rolling upgrades. + if (value.charAt(0) != '{') { + return new SchemaCacheClearEvent(value, null); + } + + Map payload; + try { + payload = JsonUtil.fromJson(value, Map.class); + } catch (RuntimeException e) { + LOG.debug("Malformed schema-cache-clear payload, ignoring: {}", + value, e); + return null; + } + + Object graph = payload.get(SCHEMA_CACHE_CLEAR_GRAPH_KEY); + if (graph == null) { + LOG.debug("Schema-cache-clear payload missing '{}' field: {}", + SCHEMA_CACHE_CLEAR_GRAPH_KEY, value); + return null; + } + + Object source = payload.get(SCHEMA_CACHE_CLEAR_SOURCE_KEY); + String sourceValue = source == null ? null : source.toString(); + return new SchemaCacheClearEvent(graph.toString(), sourceValue); + } + } + public enum MetaDriverType { ETCD, PD diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/meta/managers/GraphMetaManager.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/meta/managers/GraphMetaManager.java index 8d00bfabb2..52b2c3946e 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/meta/managers/GraphMetaManager.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/meta/managers/GraphMetaManager.java @@ -33,6 +33,7 @@ import static org.apache.hugegraph.meta.MetaManager.META_PATH_SYS_GRAPH_CONF; import static org.apache.hugegraph.meta.MetaManager.META_PATH_UPDATE; import static org.apache.hugegraph.meta.MetaManager.META_PATH_VERTEX_LABEL; +import static org.apache.hugegraph.meta.MetaManager.schemaCacheClearEventValue; import java.util.Map; import java.util.function.Consumer; @@ -94,8 +95,14 @@ public void notifyGraphClear(String graphSpace, String graph) { } public void notifySchemaCacheClear(String graphSpace, String graph) { + this.notifySchemaCacheClear(graphSpace, graph, null); + } + + public void notifySchemaCacheClear(String graphSpace, String graph, + String source) { this.metaDriver.put(this.schemaCacheClearKey(), - graphName(graphSpace, graph)); + schemaCacheClearEventValue( + graphName(graphSpace, graph), source)); } public void notifyGraphCacheClear(String graphSpace, String graph) { diff --git a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/meta/MetaManagerSchemaCacheClearEventTest.java b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/meta/MetaManagerSchemaCacheClearEventTest.java new file mode 100644 index 0000000000..852d4eae0d --- /dev/null +++ b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/meta/MetaManagerSchemaCacheClearEventTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hugegraph.meta; + +import org.apache.hugegraph.meta.MetaManager.SchemaCacheClearEvent; +import org.apache.hugegraph.testutil.Assert; +import org.junit.Test; + +public class MetaManagerSchemaCacheClearEventTest { + + @Test + public void testFromValueReturnsNullForEmptyPayload() { + Assert.assertNull(SchemaCacheClearEvent.fromValue(null)); + Assert.assertNull(SchemaCacheClearEvent.fromValue("")); + } + + @Test + public void testFromValueParsesLegacyPlainGraphName() { + SchemaCacheClearEvent event = + SchemaCacheClearEvent.fromValue("DEFAULT-graph1"); + + assertEvent(event, "DEFAULT-graph1", null); + } + + @Test + public void testFromValueIgnoresMalformedJson() { + Assert.assertNull(SchemaCacheClearEvent.fromValue("{not-json")); + } + + @Test + public void testFromValueParsesJsonWithSource() { + String value = MetaManager.schemaCacheClearEventValue("g", "u"); + SchemaCacheClearEvent event = SchemaCacheClearEvent.fromValue(value); + + assertEvent(event, "g", "u"); + } + + @Test + public void testFromValueParsesJsonWithoutSource() { + SchemaCacheClearEvent event = + SchemaCacheClearEvent.fromValue("{\"graph\":\"g\"}"); + + assertEvent(event, "g", null); + } + + @Test + public void testFromValueIgnoresJsonWithoutGraph() { + Assert.assertNull( + SchemaCacheClearEvent.fromValue("{\"source\":\"u\"}")); + } + + private static void assertEvent(SchemaCacheClearEvent event, + String graph, + String source) { + Assert.assertNotNull(event); + Assert.assertEquals(graph, event.graph()); + Assert.assertEquals(source, event.source()); + } +} diff --git a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/UnitTestSuite.java b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/UnitTestSuite.java index 4a62e48bb5..ce249a967e 100644 --- a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/UnitTestSuite.java +++ b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/UnitTestSuite.java @@ -18,6 +18,7 @@ package org.apache.hugegraph.unit; import org.apache.hugegraph.core.RoleElectionStateMachineTest; +import org.apache.hugegraph.meta.MetaManagerSchemaCacheClearEventTest; import org.apache.hugegraph.unit.api.filter.LoadDetectFilterTest; import org.apache.hugegraph.unit.api.filter.PathFilterTest; import org.apache.hugegraph.unit.api.gremlin.GremlinQueryAPITest; @@ -92,6 +93,7 @@ CacheTest.OffheapCacheTest.class, CacheTest.LevelCacheTest.class, CachedSchemaTransactionTest.class, + MetaManagerSchemaCacheClearEventTest.class, CachedGraphTransactionTest.class, CacheManagerTest.class, RamTableTest.class, diff --git a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cache/CachedSchemaTransactionTest.java b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cache/CachedSchemaTransactionTest.java index 83ba5097fa..efb63fad27 100644 --- a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cache/CachedSchemaTransactionTest.java +++ b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/cache/CachedSchemaTransactionTest.java @@ -17,11 +17,31 @@ package org.apache.hugegraph.unit.cache; +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + import org.apache.hugegraph.HugeFactory; import org.apache.hugegraph.HugeGraph; import org.apache.hugegraph.HugeGraphParams; +import org.apache.hugegraph.backend.cache.Cache; +import org.apache.hugegraph.backend.cache.CacheManager; import org.apache.hugegraph.backend.cache.CachedSchemaTransaction; +import org.apache.hugegraph.backend.cache.CachedSchemaTransactionV2; +import org.apache.hugegraph.backend.id.Id; import org.apache.hugegraph.backend.id.IdGenerator; +import org.apache.hugegraph.meta.MetaDriver; +import org.apache.hugegraph.meta.MetaManager; +import org.apache.hugegraph.meta.managers.GraphMetaManager; +import org.apache.hugegraph.schema.SchemaElement; import org.apache.hugegraph.testutil.Assert; import org.apache.hugegraph.testutil.Whitebox; import org.apache.hugegraph.type.HugeType; @@ -31,6 +51,8 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; import com.google.common.collect.ImmutableMap; @@ -165,6 +187,528 @@ public void testGetSchema() throws Exception { cache.getPropertyKey(IdGenerator.of(1)).name()); } + @Test + public void testClearV2SchemaCacheByGraphName() { + String graphName = "DEFAULT-unit-test-v2"; + String otherGraphName = "DEFAULT-other-v2"; + + Cache idCache = CacheManager.instance() + .cache("schema-id-" + + graphName, 10L); + Cache nameCache = CacheManager.instance() + .cache("schema-name-" + + graphName, 10L); + Cache otherIdCache = CacheManager.instance() + .cache("schema-id-" + + otherGraphName, + 10L); + Object arrayCaches = idCache.attachment(newV2SchemaCaches(10)); + Id arrayCacheId = IdGenerator.of(1); + SchemaElement arrayCacheSchema = + new FakeObjects("unit-test-v2") + .newPropertyKey(arrayCacheId, "fake-pk-array"); + + try { + clearV2SchemaCaches(arrayCaches); + setV2SchemaCache(arrayCaches, HugeType.PROPERTY_KEY, arrayCacheId, + arrayCacheSchema); + idCache.update(IdGenerator.of(1), "fake-pk-by-id"); + nameCache.update(IdGenerator.of("fake-pk"), "fake-pk-by-name"); + otherIdCache.update(IdGenerator.of(2), "other-pk-by-id"); + + Assert.assertEquals(1L, idCache.size()); + Assert.assertEquals(1L, nameCache.size()); + Assert.assertEquals(1L, otherIdCache.size()); + Assert.assertSame(arrayCacheSchema, + getV2SchemaCache(arrayCaches, + HugeType.PROPERTY_KEY, + arrayCacheId)); + + Whitebox.invokeStatic(CachedSchemaTransactionV2.class, + new Class[]{String.class}, + "clearSchemaCache", graphName); + + Assert.assertEquals(0L, idCache.size()); + Assert.assertEquals(0L, nameCache.size()); + Assert.assertEquals(1L, otherIdCache.size()); + Assert.assertNull(getV2SchemaCache(arrayCaches, + HugeType.PROPERTY_KEY, + arrayCacheId)); + } finally { + clearV2SchemaCaches(arrayCaches); + idCache.clear(); + nameCache.clear(); + otherIdCache.clear(); + } + } + + private static Object newV2SchemaCaches(int size) { + for (Class clazz : + CachedSchemaTransactionV2.class.getDeclaredClasses()) { + if (!"SchemaCaches".equals(clazz.getSimpleName())) { + continue; + } + try { + Constructor constructor = + clazz.getDeclaredConstructor(int.class); + constructor.setAccessible(true); + return constructor.newInstance(size); + } catch (ReflectiveOperationException e) { + throw new AssertionError("Failed to create SchemaCaches", e); + } + } + throw new AssertionError("SchemaCaches class not found"); + } + + private static void clearV2SchemaCaches(Object arrayCaches) { + Whitebox.invoke(arrayCaches.getClass(), "clear", arrayCaches); + } + + private static void setV2SchemaCache(Object arrayCaches, HugeType type, + Id id, SchemaElement schema) { + Whitebox.invoke(arrayCaches.getClass(), + new Class[]{HugeType.class, Id.class, + SchemaElement.class}, + "set", arrayCaches, type, id, schema); + } + + private static SchemaElement getV2SchemaCache(Object arrayCaches, + HugeType type, Id id) { + return Whitebox.invoke(arrayCaches.getClass(), + new Class[]{HugeType.class, Id.class}, + "get", arrayCaches, type, id); + } + + @Test + public void testListenSchemaCacheClearIsIdempotent() throws Exception { + // Once the JVM-global registration flag is set, every subsequent + // call to listenSchemaCacheClear() must short-circuit before + // touching MetaManager — even under concurrent invocation. Pre-set + // the flag, race N threads, and verify none of them propagated an + // exception (which would happen if MetaManager.instance() + // .listenSchemaCacheClear were invoked without an initialised + // driver). + Field flagField = CachedSchemaTransactionV2.class + .getDeclaredField("metaEventListenerRegistered"); + flagField.setAccessible(true); + AtomicBoolean flag = (AtomicBoolean) flagField.get(null); + boolean previous = flag.getAndSet(true); + try { + int threads = 8; + CountDownLatch start = new CountDownLatch(1); + CountDownLatch done = new CountDownLatch(threads); + AtomicInteger failures = new AtomicInteger(); + for (int i = 0; i < threads; i++) { + new Thread(() -> { + try { + start.await(); + Whitebox.invokeStatic(CachedSchemaTransactionV2.class, + "listenSchemaCacheClear"); + } catch (Throwable t) { + failures.incrementAndGet(); + } finally { + done.countDown(); + } + }).start(); + } + start.countDown(); + Assert.assertTrue("listenSchemaCacheClear race timed out", + done.await(10, TimeUnit.SECONDS)); + Assert.assertEquals("listenSchemaCacheClear must short-circuit " + + "when already registered", 0, failures.get()); + Assert.assertTrue("registration flag must remain set", flag.get()); + } finally { + flag.set(previous); + } + } + + @Test + public void testClearSchemaCacheClearsArrayAttachmentMaps() + throws Exception { + // clearSchemaCache() must wipe idCache, nameCache and every internal + // IntObjectMap (pks/vls/els/ils) inside the array attachment so + // stale entries are not served after a meta event. + String graphName = "DEFAULT-unit-test-v2-array"; + Cache idCache = + CacheManager.instance().cache("schema-id-" + graphName, 10L); + Cache nameCache = + CacheManager.instance().cache("schema-name-" + graphName, 10L); + // Size must comfortably exceed the largest id below: IntObjectMap + // grows by doubling and refuses to write past currentSize even after + // a single expansion, so a small capacity rejects mid-range keys. + Object arrayCaches = idCache.attachment(newV2SchemaCaches(64)); + Id pkId = IdGenerator.of(1); + Id vlId = IdGenerator.of(2); + Id elId = IdGenerator.of(3); + Id ilId = IdGenerator.of(4); + FakeObjects fakeObjects = new FakeObjects("unit-test-v2-array"); + SchemaElement pk = fakeObjects.newPropertyKey(pkId, "fake-pk"); + + try { + clearV2SchemaCaches(arrayCaches); + setV2SchemaCache(arrayCaches, HugeType.PROPERTY_KEY, pkId, pk); + setV2SchemaCache(arrayCaches, HugeType.VERTEX_LABEL, vlId, pk); + setV2SchemaCache(arrayCaches, HugeType.EDGE_LABEL, elId, pk); + setV2SchemaCache(arrayCaches, HugeType.INDEX_LABEL, ilId, pk); + idCache.update(pkId, "fake-pk-by-id"); + nameCache.update(IdGenerator.of("fake-pk"), "fake-pk-by-name"); + + Assert.assertEquals(1L, idCache.size()); + Assert.assertEquals(1L, nameCache.size()); + Assert.assertNotNull(getV2SchemaCache(arrayCaches, + HugeType.PROPERTY_KEY, pkId)); + + Whitebox.invokeStatic(CachedSchemaTransactionV2.class, + new Class[]{String.class}, + "clearSchemaCache", graphName); + + Assert.assertEquals(0L, idCache.size()); + Assert.assertEquals(0L, nameCache.size()); + for (String mapName : new String[]{"pks", "vls", "els", "ils"}) { + Object intMap = readField(arrayCaches, mapName); + assertIntObjectMapEmpty(intMap, mapName); + } + Map cachedTypes = readField(arrayCaches, + "cachedTypes"); + Assert.assertTrue("cachedTypes must be empty after clear", + cachedTypes.isEmpty()); + } finally { + clearV2SchemaCaches(arrayCaches); + idCache.clear(); + nameCache.clear(); + } + } + + // TASK_SYNC_DELETION gating of removeSchema notifications and the + // unconditional addSchema notification require an initialised + // CachedSchemaTransactionV2 instance, which in turn needs an hstore + // backend and a connected MetaManager. Both prerequisites are out of + // scope for this unit test class. They are exercised end-to-end by the + // hstore integration tests in CoreTestSuite. TODO(#2617): port these + // assertions into a dedicated CachedSchemaTransactionV2IT once + // mockito-inline becomes available so MetaManager.instance() can be + // stubbed without an hstore cluster. + + @Test + public void testHandleSchemaCacheClearEventSkipsLocalSource() + throws Exception { + String graphName = "DEFAULT-meta-local-source-v2"; + Cache idCache = + CacheManager.instance().cache("schema-id-" + graphName, 10L); + Cache nameCache = + CacheManager.instance() + .cache("schema-name-" + graphName, 10L); + + MetaDriver mockDriver = Mockito.mock(MetaDriver.class); + Object localResponse = new Object(); + Object remoteResponse = new Object(); + String localSource = schemaCacheClearSource(); + Mockito.when(mockDriver.extractValuesFromResponse(localResponse)) + .thenReturn(Collections.singletonList( + MetaManager.schemaCacheClearEventValue(graphName, + localSource))); + Mockito.when(mockDriver.extractValuesFromResponse(remoteResponse)) + .thenReturn(Collections.singletonList( + MetaManager.schemaCacheClearEventValue(graphName, + "remote"))); + + MetaDriver originalDriver = swapMetaDriver(mockDriver); + try { + idCache.update(IdGenerator.of(1), "v"); + nameCache.update(IdGenerator.of("n"), "v"); + + Whitebox.invokeStatic(CachedSchemaTransactionV2.class, + new Class[]{Object.class}, + "handleSchemaCacheClearEvent", + localResponse); + + Assert.assertEquals("local echo must not clear id cache", + 1L, idCache.size()); + Assert.assertEquals("local echo must not clear name cache", + 1L, nameCache.size()); + + Whitebox.invokeStatic(CachedSchemaTransactionV2.class, + new Class[]{Object.class}, + "handleSchemaCacheClearEvent", + remoteResponse); + + Assert.assertEquals(0L, idCache.size()); + Assert.assertEquals(0L, nameCache.size()); + } finally { + swapMetaDriver(originalDriver); + idCache.clear(); + nameCache.clear(); + } + } + + @Test + public void testHandleSchemaCacheClearEventClearsTargetGraphOnly() + throws Exception { + // End-to-end coverage of the meta-event consumer: + // publish (response) -> MetaManager extract -> clearSchemaCache + // We bypass the live etcd/PD watch by stubbing MetaDriver on the + // MetaManager singleton and invoking the package-private consumer + // directly. This validates that only the targeted graph's caches are + // cleared and that other graphs in the same JVM are left untouched. + String targetGraph = "DEFAULT-meta-target-v2"; + String otherGraph = "DEFAULT-meta-other-v2"; + + Cache targetIdCache = + CacheManager.instance().cache("schema-id-" + targetGraph, 10L); + Cache targetNameCache = + CacheManager.instance() + .cache("schema-name-" + targetGraph, 10L); + Cache otherIdCache = + CacheManager.instance().cache("schema-id-" + otherGraph, 10L); + + MetaDriver mockDriver = Mockito.mock(MetaDriver.class); + Object response = new Object(); + Mockito.when(mockDriver.extractValuesFromResponse(response)) + .thenReturn(Arrays.asList(targetGraph)); + + MetaDriver originalDriver = swapMetaDriver(mockDriver); + try { + targetIdCache.update(IdGenerator.of(1), "v"); + targetNameCache.update(IdGenerator.of("n"), "v"); + otherIdCache.update(IdGenerator.of(2), "v"); + + Assert.assertEquals(1L, targetIdCache.size()); + Assert.assertEquals(1L, targetNameCache.size()); + Assert.assertEquals(1L, otherIdCache.size()); + + Whitebox.invokeStatic(CachedSchemaTransactionV2.class, + new Class[]{Object.class}, + "handleSchemaCacheClearEvent", response); + + Assert.assertEquals(0L, targetIdCache.size()); + Assert.assertEquals(0L, targetNameCache.size()); + Assert.assertEquals("Other graph caches must remain untouched", + 1L, otherIdCache.size()); + } finally { + swapMetaDriver(originalDriver); + targetIdCache.clear(); + targetNameCache.clear(); + otherIdCache.clear(); + } + } + + @Test + public void testHandleSchemaCacheClearEventNullGraphsIsNoop() + throws Exception { + // A response that yields no graph names (extractor returns null) must + // be a strict noop: caches stay populated. + String graphName = "DEFAULT-meta-noop-v2"; + Cache idCache = + CacheManager.instance().cache("schema-id-" + graphName, 10L); + + MetaDriver mockDriver = Mockito.mock(MetaDriver.class); + Object response = new Object(); + Mockito.when(mockDriver.extractValuesFromResponse(response)) + .thenReturn(null); + + MetaDriver originalDriver = swapMetaDriver(mockDriver); + try { + idCache.update(IdGenerator.of(1), "v"); + Assert.assertEquals(1L, idCache.size()); + + Whitebox.invokeStatic(CachedSchemaTransactionV2.class, + new Class[]{Object.class}, + "handleSchemaCacheClearEvent", response); + + Assert.assertEquals("noop response must not clear any cache", + 1L, idCache.size()); + } finally { + swapMetaDriver(originalDriver); + idCache.clear(); + } + } + + @Test + public void testHandleSchemaCacheClearEventClearsMultipleGraphs() + throws Exception { + // A single meta event may carry multiple graph names; every one of + // them must have its V2 caches cleared. + String graphA = "DEFAULT-meta-multi-a"; + String graphB = "DEFAULT-meta-multi-b"; + Cache idA = + CacheManager.instance().cache("schema-id-" + graphA, 10L); + Cache idB = + CacheManager.instance().cache("schema-id-" + graphB, 10L); + + MetaDriver mockDriver = Mockito.mock(MetaDriver.class); + Object response = new Object(); + Mockito.when(mockDriver.extractValuesFromResponse(response)) + .thenReturn(Arrays.asList(graphA, graphB)); + + MetaDriver originalDriver = swapMetaDriver(mockDriver); + try { + idA.update(IdGenerator.of(1), "v"); + idB.update(IdGenerator.of(2), "v"); + Assert.assertEquals(1L, idA.size()); + Assert.assertEquals(1L, idB.size()); + + Whitebox.invokeStatic(CachedSchemaTransactionV2.class, + new Class[]{Object.class}, + "handleSchemaCacheClearEvent", response); + + Assert.assertEquals(0L, idA.size()); + Assert.assertEquals(0L, idB.size()); + } finally { + swapMetaDriver(originalDriver); + idA.clear(); + idB.clear(); + } + } + + @Test + @SuppressWarnings({"rawtypes", "unchecked"}) + public void testListenSchemaCacheClearRegistersOnlyOnce() throws Exception { + // Two CachedSchemaTransactionV2 instances in the same JVM must share + // the JVM-global meta listener: only ONE underlying watch should be + // installed even if listenSchemaCacheClear() is invoked multiple + // times. We assert this directly against the MetaDriver mock. + MetaDriver mockDriver = Mockito.mock(MetaDriver.class); + GraphMetaManager mockGraphMgr = + new GraphMetaManager(mockDriver, "test-cluster"); + + AtomicBoolean flag = metaListenerFlag(); + boolean previousFlag = flag.getAndSet(false); + MetaDriver originalDriver = swapMetaDriver(mockDriver); + Object originalGraphMgr = swapGraphMetaManager(mockGraphMgr); + try { + Whitebox.invokeStatic(CachedSchemaTransactionV2.class, + "listenSchemaCacheClear"); + Whitebox.invokeStatic(CachedSchemaTransactionV2.class, + "listenSchemaCacheClear"); + Whitebox.invokeStatic(CachedSchemaTransactionV2.class, + "listenSchemaCacheClear"); + + ArgumentCaptor captor = + ArgumentCaptor.forClass(Consumer.class); + Mockito.verify(mockDriver, Mockito.times(1)) + .listen(Mockito.anyString(), captor.capture()); + Assert.assertNotNull("registered consumer must not be null", + captor.getValue()); + Assert.assertTrue("flag must be set after successful registration", + flag.get()); + } finally { + flag.set(previousFlag); + swapMetaDriver(originalDriver); + swapGraphMetaManager(originalGraphMgr); + } + } + + @Test + @SuppressWarnings({"rawtypes", "unchecked"}) + public void testListenSchemaCacheClearEndToEnd() throws Exception { + // Full publish -> callback -> clear path: register the listener via + // the production code, capture the consumer that was wired into the + // MetaDriver, then invoke it as the watch would and assert the V2 + // caches for the named graph are cleared. + String graphName = "DEFAULT-end-to-end-v2"; + Cache idCache = + CacheManager.instance().cache("schema-id-" + graphName, 10L); + Cache nameCache = + CacheManager.instance() + .cache("schema-name-" + graphName, 10L); + + MetaDriver mockDriver = Mockito.mock(MetaDriver.class); + Object response = new Object(); + Mockito.when(mockDriver.extractValuesFromResponse(response)) + .thenReturn(Collections.singletonList(graphName)); + GraphMetaManager mockGraphMgr = + new GraphMetaManager(mockDriver, "test-cluster"); + + AtomicBoolean flag = metaListenerFlag(); + boolean previousFlag = flag.getAndSet(false); + MetaDriver originalDriver = swapMetaDriver(mockDriver); + Object originalGraphMgr = swapGraphMetaManager(mockGraphMgr); + try { + idCache.update(IdGenerator.of(1), "v"); + nameCache.update(IdGenerator.of("n"), "v"); + Assert.assertEquals(1L, idCache.size()); + Assert.assertEquals(1L, nameCache.size()); + + Whitebox.invokeStatic(CachedSchemaTransactionV2.class, + "listenSchemaCacheClear"); + + ArgumentCaptor captor = + ArgumentCaptor.forClass(Consumer.class); + Mockito.verify(mockDriver) + .listen(Mockito.anyString(), captor.capture()); + + // Simulate the meta server publishing a schema-cache-clear event: + // invoke the consumer captured above with a synthetic response. + captor.getValue().accept(response); + + Assert.assertEquals(0L, idCache.size()); + Assert.assertEquals(0L, nameCache.size()); + } finally { + flag.set(previousFlag); + swapMetaDriver(originalDriver); + swapGraphMetaManager(originalGraphMgr); + idCache.clear(); + nameCache.clear(); + } + } + + private static AtomicBoolean metaListenerFlag() throws Exception { + Field f = CachedSchemaTransactionV2.class + .getDeclaredField("metaEventListenerRegistered"); + f.setAccessible(true); + return (AtomicBoolean) f.get(null); + } + + private static String schemaCacheClearSource() throws Exception { + Field f = CachedSchemaTransactionV2.class + .getDeclaredField("SCHEMA_CACHE_CLEAR_SOURCE"); + f.setAccessible(true); + return (String) f.get(null); + } + + private static MetaDriver swapMetaDriver(MetaDriver replacement) + throws Exception { + Field f = MetaManager.class.getDeclaredField("metaDriver"); + f.setAccessible(true); + MetaDriver previous = (MetaDriver) f.get(MetaManager.instance()); + f.set(MetaManager.instance(), replacement); + return previous; + } + + private static Object swapGraphMetaManager(Object replacement) + throws Exception { + Field f = MetaManager.class.getDeclaredField("graphMetaManager"); + f.setAccessible(true); + Object previous = f.get(MetaManager.instance()); + f.set(MetaManager.instance(), replacement); + return previous; + } + + @SuppressWarnings("unchecked") + private static T readField(Object target, String name) + throws ReflectiveOperationException { + Field field = target.getClass().getDeclaredField(name); + field.setAccessible(true); + return (T) field.get(target); + } + + private static void assertIntObjectMapEmpty(Object intMap, String label) + throws ReflectiveOperationException { + Object array = readField(intMap, "array"); + if (array instanceof Object[]) { + for (Object slot : (Object[]) array) { + Assert.assertNull(label + " slot must be null after clear", + slot); + } + return; + } + // Older IntObjectMap implementations expose a size accessor instead + // of a raw array; fall back to that if reflection finds no array. + Object size = Whitebox.invoke(intMap.getClass(), "size", intMap); + Assert.assertEquals(label + " must report size 0 after clear", + 0, ((Number) size).intValue()); + } + @Test public void testResetCachedAllIfReachedCapacity() throws Exception { CachedSchemaTransaction cache = this.cache();