Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import org.apache.hugegraph.HugeGraphParams;
Expand All @@ -33,6 +36,7 @@
import org.apache.hugegraph.event.EventListener;
import org.apache.hugegraph.meta.MetaDriver;
import org.apache.hugegraph.meta.MetaManager;
import org.apache.hugegraph.meta.MetaManager.SchemaCacheClearEvent;
import org.apache.hugegraph.perf.PerfUtil;
import org.apache.hugegraph.schema.SchemaElement;
import org.apache.hugegraph.type.HugeType;
Expand All @@ -43,6 +47,29 @@

public class CachedSchemaTransactionV2 extends SchemaTransactionV2 {

private static final String ID_CACHE_PREFIX = "schema-id";
private static final String NAME_CACHE_PREFIX = "schema-name";

// MetaDriver doesn't expose unlisten, register the meta listener once.
// Lifecycle: this JVM-global flag is intentionally never reset by
// unlistenChanges() (the underlying gRPC watch is process-wide). If that
// watch is silently dropped after a transport reconnect, recovery is not
// automatic; resetMetaListenerForReconnect() is only a manual hook to let
// the next schema operation install a fresh watch.
private static final AtomicBoolean metaEventListenerRegistered =
new AtomicBoolean(false);
Comment thread
dpol1 marked this conversation as resolved.

private static final Object META_LISTENER_LOCK = new Object();

/**
* Per-JVM identifier emitted with every schema-cache-clear meta event so
* the listener can skip its own echo. Lifecycle: generated once per
* classloader at class init, never reused, regenerated on JVM restart.
* This is not a stable node identity, only a local self-echo filter.
*/
private static final String SCHEMA_CACHE_CLEAR_SOURCE =
UUID.randomUUID().toString();

private final Cache<Id, Object> idCache;
private final Cache<Id, Object> nameCache;

Expand All @@ -58,8 +85,8 @@ public CachedSchemaTransactionV2(MetaDriver metaDriver,

final long capacity = graphParams.configuration()
.get(CoreOptions.SCHEMA_CACHE_CAPACITY);
this.idCache = this.cache("schema-id", capacity);
this.nameCache = this.cache("schema-name", capacity);
this.idCache = this.cache(ID_CACHE_PREFIX, capacity);
this.nameCache = this.cache(NAME_CACHE_PREFIX, capacity);

SchemaCaches<SchemaElement> attachment = this.idCache.attachment();
if (attachment == null) {
Expand All @@ -86,11 +113,38 @@ public void close() {
}

private Cache<Id, Object> cache(String prefix, long capacity) {
final String name = prefix + "-" + this.graph().spaceGraphName();
final String name = cacheName(prefix, this.graph().spaceGraphName());
// NOTE: must disable schema cache-expire due to getAllSchema()
return CacheManager.instance().cache(name, capacity);
}

private static String cacheName(String prefix, String spaceGraphName) {
return prefix + "-" + spaceGraphName;
}

private static void clearSchemaCache(String spaceGraphName) {
Map<String, Cache<Id, Object>> caches = CacheManager.instance().caches();

// Clear name cache first so the (name -> id -> object) lookup path
// fails fast instead of returning a stale object backed by an
// already-empty id cache during the TOCTOU window.
Cache<Id, Object> nameCache = caches.get(cacheName(NAME_CACHE_PREFIX,
spaceGraphName));
if (nameCache != null) {
nameCache.clear();
}

Cache<Id, Object> idCache = caches.get(cacheName(ID_CACHE_PREFIX,
spaceGraphName));
if (idCache != null) {
SchemaCaches<?> arrayCaches = idCache.attachment();
if (arrayCaches != null) {
arrayCaches.clear();
}
idCache.clear();
}
}

private void listenChanges() {
// Listen store event: "store.init", "store.clear", ...
Set<String> storeEvents = ImmutableSet.of(Events.STORE_INIT,
Expand All @@ -100,7 +154,8 @@ private void listenChanges() {
if (storeEvents.contains(event.name())) {
LOG.debug("Graph {} clear schema cache on event '{}'",
this.graph(), event.name());
this.clearCache(true);
boolean notify = !Events.STORE_INIT.equals(event.name());
this.clearCache(notify);
return true;
}
return false;
Expand Down Expand Up @@ -145,12 +200,88 @@ private void listenChanges() {
if (!schemaEventHub.containsListener(Events.CACHE)) {
schemaEventHub.listen(Events.CACHE, this.cacheEventListener);
}

listenSchemaCacheClear();
}

private static void listenSchemaCacheClear() {
synchronized (META_LISTENER_LOCK) {
if (metaEventListenerRegistered.get()) {
return;
}
try {
MetaManager.instance().listenSchemaCacheClear(
CachedSchemaTransactionV2::handleSchemaCacheClearEvent);
// Set AFTER the underlying watch is live so a concurrent
// caller that observes the flag is guaranteed an active
// subscription, and a failure leaves the flag false so the
// next caller retries registration.
metaEventListenerRegistered.set(true);
} catch (Exception e) {
throw e instanceof RuntimeException
? (RuntimeException) e
: new RuntimeException(
"Failed to register schema cache clear listener",
e);
}
}
}

/**
* Consumer invoked by the MetaManager schema-cache-clear watch. Extracted
* as a package-private static method so end-to-end tests can drive the
* publish -> callback -> {@link #clearSchemaCache(String)} path without
* depending on a live etcd/PD watch.
*/
static <T> void handleSchemaCacheClearEvent(T response) {
List<SchemaCacheClearEvent> events =
MetaManager.instance()
.extractSchemaCacheClearEventsFromResponse(
response);
if (events == null) {
return;
Comment thread
dpol1 marked this conversation as resolved.
}
for (SchemaCacheClearEvent event : events) {
if (SCHEMA_CACHE_CLEAR_SOURCE.equals(event.source())) {
continue;
}
String graphName = event.graph();
LOG.debug("Graph {} clear schema cache on meta event", graphName);
clearSchemaCache(graphName);
}
}

/**
* Manually reset the JVM-global meta listener flag after detecting that
* the MetaManager transport reconnected and dropped the underlying gRPC
* watch. This method is not wired to a MetaManager/MetaDriver reconnect
* callback today; callers must invoke it explicitly after detecting that
* condition. Without such a manual reset {@link #metaEventListenerRegistered}
* would stay {@code true} forever and this JVM would stop receiving
* cross-node schema cache clear events with no error or warning.
*
* <p>TODO: wire this into MetaManager once it exposes a transport
* reconnect callback (e.g. {@code listenReconnect} /
* {@code onTransportReconnect}). Until then it must be invoked
* explicitly by code that detects the reconnect.
*/
public static void resetMetaListenerForReconnect() {
if (metaEventListenerRegistered.compareAndSet(true, false)) {
LOG.warn("Schema cache clear meta listener lost on reconnect - " +
"will re-register on next schema operation.");
}
}

public void clearCache(boolean notify) {
this.idCache.clear();
// Same TOCTOU ordering as clearSchemaCache(String): clear nameCache
// first, then the array attachment, then idCache last.
this.nameCache.clear();
this.arrayCaches.clear();
this.idCache.clear();

if (notify) {
this.maybeNotifySchemaCacheClear();
}
}

private void resetCachedAllIfReachedCapacity() {
Expand Down Expand Up @@ -202,6 +333,8 @@ protected void updateSchema(SchemaElement schema,
super.updateSchema(schema, updateCallback);

this.updateCache(schema);
// Status transitions are internal bookkeeping; notifying here causes a
// broadcast storm for every updateSchemaStatus() call from background jobs.
}
Comment thread
dpol1 marked this conversation as resolved.

@Override
Expand All @@ -210,11 +343,9 @@ protected void addSchema(SchemaElement schema) {

this.updateCache(schema);

if (!this.graph().option(CoreOptions.TASK_SYNC_DELETION)) {
MetaManager.instance()
.notifySchemaCacheClear(this.graph().graphSpace(),
this.graph().name());
}
// Schema additions must always propagate to remote nodes regardless
// of TASK_SYNC_DELETION (which only gates removal flows).
this.notifySchemaCacheClear();
}

private void updateCache(SchemaElement schema) {
Expand All @@ -238,13 +369,25 @@ public void removeSchema(SchemaElement schema) {

this.invalidateCache(schema.type(), schema.id());

this.maybeNotifySchemaCacheClear();
}

private void maybeNotifySchemaCacheClear() {
// Only suppress notifications for removal tasks when
// TASK_SYNC_DELETION=true: the caller propagates cache invalidation
// synchronously, so the meta-event broadcast would be redundant.
if (!this.graph().option(CoreOptions.TASK_SYNC_DELETION)) {
MetaManager.instance()
.notifySchemaCacheClear(this.graph().graphSpace(),
this.graph().name());
this.notifySchemaCacheClear();
}
}

private void notifySchemaCacheClear() {
MetaManager.instance()
.notifySchemaCacheClear(this.graph().graphSpace(),
this.graph().name(),
SCHEMA_CACHE_CLEAR_SOURCE);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nit (non-blocking): consider adding a @VisibleForTesting or explicit visibility note on resetMetaListenerForReconnect()

The method is public static but the Javadoc makes clear it's an escape hatch, not part of the stable API. A @VisibleForTesting (Guava) or at least a @apiNote line clarifying "internal recovery hook; do not call from application code" would prevent external callers from building on what's intended as a temporary shim until MetaManager exposes a reconnect callback.

Suggested change
SCHEMA_CACHE_CLEAR_SOURCE);
/**
* Manually reset the JVM-global meta listener flag after detecting that
* the MetaManager transport reconnected and dropped the underlying gRPC
* watch. This method is not wired to a MetaManager/MetaDriver reconnect
* callback today; callers must invoke it explicitly after detecting that
* condition. Without such a manual reset {@link #metaEventListenerRegistered}
* would stay {@code true} forever and this JVM would stop receiving
* cross-node schema cache clear events with no error or warning.
*
* <p>TODO: wire this into MetaManager once it exposes a transport
* reconnect callback (e.g. {@code listenReconnect} /
* {@code onTransportReconnect}). Until then it must be invoked
* explicitly by code that detects the reconnect.
*
* @apiNote Internal recovery hook. Not part of the stable APIexternal
* callers should not depend on this method; it may be replaced
* by an automatic reconnect callback in a future release.
*/
public static void resetMetaListenerForReconnect() {

}

@Override
@SuppressWarnings("unchecked")
protected <T extends SchemaElement> T getSchema(HugeType type, Id id) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,16 @@
import org.apache.hugegraph.space.SchemaTemplate;
import org.apache.hugegraph.space.Service;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.JsonUtil;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;

import com.google.common.collect.ImmutableMap;

public class MetaManager {

private static final Logger LOG = Log.logger(MetaManager.class);

public static final String META_PATH_DELIMITER = "/";
public static final String META_PATH_JOIN = "-";

Expand Down Expand Up @@ -119,6 +124,8 @@ public class MetaManager {
public static final long LOCK_DEFAULT_LEASE = 30L;
public static final long LOCK_DEFAULT_TIMEOUT = 10L;
public static final int RANDOM_USER_ID = 100;
private static final String SCHEMA_CACHE_CLEAR_GRAPH_KEY = "graph";
private static final String SCHEMA_CACHE_CLEAR_SOURCE_KEY = "source";
private static final String META_PATH_URLS = "URLS";
private static final String META_PATH_PD_PEERS = "HSTORE_PD_PEERS";
private static final MetaManager INSTANCE = new MetaManager();
Expand Down Expand Up @@ -380,6 +387,23 @@ public <T> List<String> extractGraphsFromResponse(T response) {
return this.metaDriver.extractValuesFromResponse(response);
}

public <T> List<SchemaCacheClearEvent> extractSchemaCacheClearEventsFromResponse(
T response) {
List<String> values = this.metaDriver.extractValuesFromResponse(response);
if (values == null) {
return null;
}

List<SchemaCacheClearEvent> events = new ArrayList<>(values.size());
for (String value : values) {
SchemaCacheClearEvent event = SchemaCacheClearEvent.fromValue(value);
if (event != null) {
events.add(event);
}
}
return events;
}

public <T> Map<String, String> extractKVFromResponse(T response) {
return this.metaDriver.extractKVFromResponse(response);
}
Expand Down Expand Up @@ -499,7 +523,12 @@ public void notifyGraphClear(String graphSpace, String graph) {
}

public void notifySchemaCacheClear(String graphSpace, String graph) {
this.graphMetaManager.notifySchemaCacheClear(graphSpace, graph);
this.notifySchemaCacheClear(graphSpace, graph, null);
}

public void notifySchemaCacheClear(String graphSpace, String graph,
String source) {
this.graphMetaManager.notifySchemaCacheClear(graphSpace, graph, source);
}

public void notifyGraphCacheClear(String graphSpace, String graph) {
Expand Down Expand Up @@ -1287,6 +1316,70 @@ public void setWhiteIpStatus(boolean status) {
this.metaDriver.put(key, ((Boolean) status).toString());
}

public static String schemaCacheClearEventValue(String graph,
String source) {
if (StringUtils.isEmpty(source)) {
return graph;
}
return JsonUtil.toJson(ImmutableMap.of(SCHEMA_CACHE_CLEAR_GRAPH_KEY,
graph,
SCHEMA_CACHE_CLEAR_SOURCE_KEY,
source));
}

public static final class SchemaCacheClearEvent {

private final String graph;
private final String source;

private SchemaCacheClearEvent(String graph, String source) {
this.graph = graph;
this.source = source;
}

public String graph() {
return this.graph;
}

public String source() {
return this.source;
}

@SuppressWarnings("unchecked")
static SchemaCacheClearEvent fromValue(String value) {
if (StringUtils.isEmpty(value)) {
return null;
}
// Compatibility: events published before source-id support stored
// only the graph name as a plain string. Keep accepting that format
// so mixed-version clusters can consume old/new schema-cache-clear
// events during rolling upgrades.
if (value.charAt(0) != '{') {
return new SchemaCacheClearEvent(value, null);
}

Map<String, Object> payload;
try {
payload = JsonUtil.fromJson(value, Map.class);
} catch (RuntimeException e) {
LOG.debug("Malformed schema-cache-clear payload, ignoring: {}",
value, e);
return null;
}

Object graph = payload.get(SCHEMA_CACHE_CLEAR_GRAPH_KEY);
if (graph == null) {
LOG.debug("Schema-cache-clear payload missing '{}' field: {}",
SCHEMA_CACHE_CLEAR_GRAPH_KEY, value);
return null;
}

Object source = payload.get(SCHEMA_CACHE_CLEAR_SOURCE_KEY);
String sourceValue = source == null ? null : source.toString();
return new SchemaCacheClearEvent(graph.toString(), sourceValue);
}
}

public enum MetaDriverType {
ETCD,
PD
Expand Down
Loading
Loading