From dbb90d5d6c24f8264bb0c2bcd5e3dea35b5c7b4f Mon Sep 17 00:00:00 2001 From: Anton Vinogradov Date: Mon, 7 Aug 2023 15:29:05 +0300 Subject: [PATCH 01/33] WIP --- .../util/lang/GridMetadataAwareAdapter.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridMetadataAwareAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridMetadataAwareAdapter.java index 48fe4c5003201..7d0206cfa66c1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridMetadataAwareAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridMetadataAwareAdapter.java @@ -42,7 +42,7 @@ public enum EntryKey { //keys sorted by usage rate, descending. CACHE_EVICTION_MANAGER_KEY(2); /** key. */ - private int key; + private final int key; /** * @param key key @@ -63,7 +63,7 @@ public int key() { /** Attributes. */ @GridToStringInclude(sensitive = true) - private Object[] data = null; + private Object[] data; /** * Copies all metadata from another instance. @@ -107,10 +107,10 @@ public void copyMeta(Object[] data) { assert val != null; synchronized (this) { - if (this.data == null) - this.data = new Object[key + 1]; - else if (this.data.length <= key) - this.data = Arrays.copyOf(this.data, key + 1); + if (data == null) + data = new Object[key + 1]; + else if (data.length <= key) + data = Arrays.copyOf(data, key + 1); V old = (V)data[key]; @@ -239,7 +239,7 @@ public boolean hasMeta(int key, V val) { assert val != null; synchronized (this) { - V v = (V)meta(key); + V v = meta(key); if (v == null) return addMeta(key, val); @@ -261,7 +261,7 @@ public V addMetaIfAbsent(int key, V val) { assert val != null; synchronized (this) { - V v = (V)meta(key); + V v = meta(key); if (v == null) addMeta(key, v = val); @@ -285,7 +285,7 @@ public V addMetaIfAbsent(int key, V val) { assert c != null; synchronized (this) { - V v = (V)meta(key); + V v = meta(key); if (v == null && c != null) try { From 6b8d458f0a3ac5bcc0d9297d4487751401f73b15 Mon Sep 17 00:00:00 2001 From: Anton Vinogradov Date: Mon, 7 Aug 2023 15:54:56 +0300 Subject: [PATCH 02/33] WIP --- .../internal/util/lang/GridMetadataAwareAdapter.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridMetadataAwareAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridMetadataAwareAdapter.java index 7d0206cfa66c1..a530534c76621 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridMetadataAwareAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridMetadataAwareAdapter.java @@ -36,10 +36,7 @@ public enum EntryKey { //keys sorted by usage rate, descending. CACHE_STORE_MANAGER_KEY(0), /** Predefined key. */ - CACHE_EVICTABLE_ENTRY_KEY(1), - - /** Predefined key. */ - CACHE_EVICTION_MANAGER_KEY(2); + CACHE_EVICTABLE_ENTRY_KEY(1); /** key. */ private final int key; @@ -287,7 +284,7 @@ public V addMetaIfAbsent(int key, V val) { synchronized (this) { V v = meta(key); - if (v == null && c != null) + if (v == null) try { addMeta(key, v = c.call()); } From 4bfe4287cf15a02740a86d73c0933015986fabba Mon Sep 17 00:00:00 2001 From: Anton Vinogradov Date: Mon, 7 Aug 2023 15:59:35 +0300 Subject: [PATCH 03/33] WIP --- .../processors/cache/transactions/IgniteTxAdapter.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 0bd7e0028b276..79c48b894bdd6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -326,7 +326,7 @@ protected IgniteTxAdapter( this.txSize = txSize; this.subjId = subjId; this.taskNameHash = taskNameHash; - this.deploymentLdrId = U.contextDeploymentClassLoaderId(cctx.kernalContext()); + deploymentLdrId = U.contextDeploymentClassLoaderId(cctx.kernalContext()); nodeId = cctx.discovery().localNode().id(); @@ -384,7 +384,7 @@ protected IgniteTxAdapter( this.txSize = txSize; this.subjId = subjId; this.taskNameHash = taskNameHash; - this.deploymentLdrId = null; + deploymentLdrId = null; implicit = false; loc = false; @@ -903,7 +903,7 @@ public void logTxFinishErrorSafe(@Nullable IgniteLogger log, boolean commit, Thr state = MARKED_ROLLBACK; if (log.isDebugEnabled()) - log.debug("Changed transaction state [prev=" + prev + ", new=" + this.state + ", tx=" + this + ']'); + log.debug("Changed transaction state [prev=" + prev + ", new=" + state + ", tx=" + this + ']'); notifyAll(); } From 592f910da6eaa356b535679fa0e32455e185867e Mon Sep 17 00:00:00 2001 From: Anton Vinogradov Date: Mon, 7 Aug 2023 16:01:23 +0300 Subject: [PATCH 04/33] WIP --- .../processors/cache/transactions/IgniteTxAdapter.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 79c48b894bdd6..7c2d3364f15c3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -1688,9 +1688,9 @@ else if (txEntry.hasOldValue()) IgniteThread.onEntryProcessorEntered(true); try { - EntryProcessor processor = t.get1(); + EntryProcessor proc = t.get1(); - procRes = processor.process(invokeEntry, t.get2()); + procRes = proc.process(invokeEntry, t.get2()); val = invokeEntry.getValue(); From 0edd8c9fb6bed8d37576363e7d36517271fb7a05 Mon Sep 17 00:00:00 2001 From: Anton Vinogradov Date: Mon, 7 Aug 2023 16:03:50 +0300 Subject: [PATCH 05/33] WIP --- .../internal/processors/cache/transactions/IgniteTxAdapter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 7c2d3364f15c3..ce497ef6d953f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -1441,7 +1441,7 @@ protected final void batchStoreCommit(Iterable writeEntries) thro IgniteBiTuple res = applyTransformClosures(e, false, null); - GridCacheContext cacheCtx = e.context(); + GridCacheContext cacheCtx = e.context(); GridCacheOperation op = res.get1(); KeyCacheObject key = e.key(); From 90d44223e1a31a90ca67c6ad090aadfbaeac214d Mon Sep 17 00:00:00 2001 From: Anton Vinogradov Date: Mon, 7 Aug 2023 16:06:40 +0300 Subject: [PATCH 06/33] WIP --- .../processors/cache/transactions/IgniteTxAdapter.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index ce497ef6d953f..98a5ed276c890 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -1277,6 +1277,14 @@ private void recordStateChangedEvent(TransactionState state) { break; } + + case PREPARING: + case MARKED_ROLLBACK: + case PREPARED: + case COMMITTING: + case ROLLING_BACK: + case UNKNOWN: + break; } } From 26f7121dfa739b558f094acc0fb4aff1f68a8191 Mon Sep 17 00:00:00 2001 From: Anton Vinogradov Date: Mon, 7 Aug 2023 16:20:46 +0300 Subject: [PATCH 07/33] WIP --- .../cache/transactions/IgniteTxAdapter.java | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 98a5ed276c890..d362096498fdc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -706,13 +706,7 @@ public boolean remote() { if (invalidParts == null) invalidParts = new HashMap<>(); - Set parts = invalidParts.get(cacheId); - - if (parts == null) { - parts = new HashSet<>(); - - invalidParts.put(cacheId, parts); - } + Set parts = invalidParts.computeIfAbsent(cacheId, k -> new HashSet<>()); parts.add(part); @@ -875,7 +869,7 @@ public void logTxFinishErrorSafe(@Nullable IgniteLogger log, boolean commit, Thr /** {@inheritDoc} */ @Override public boolean ownsLockUnsafe(GridCacheEntryEx entry) { - GridCacheContext cacheCtx = entry.context(); + GridCacheContext cacheCtx = entry.context(); IgniteTxEntry txEntry = entry(entry.txKey()); @@ -1384,7 +1378,7 @@ protected boolean isWriteToStoreFromDhtValid(Collection store protected void sessionEnd(final Collection stores, boolean commit) throws IgniteCheckedException { Iterator it = stores.iterator(); - Set visited = new GridSetWrapper<>(new IdentityHashMap()); + Set visited = new GridSetWrapper<>(new IdentityHashMap<>()); while (it.hasNext()) { CacheStoreManager store = it.next(); @@ -1618,7 +1612,7 @@ protected IgniteBiTuple applyTransformClosures( @Nullable GridCacheReturn ret) throws GridCacheEntryRemovedException, IgniteCheckedException { assert txEntry.op() != TRANSFORM || !F.isEmpty(txEntry.entryProcessors()) : txEntry; - GridCacheContext cacheCtx = txEntry.context(); + GridCacheContext cacheCtx = txEntry.context(); assert cacheCtx != null; @@ -1821,7 +1815,7 @@ else if (op == UPDATE) GridCacheVersionedEntryEx oldEntry = old.versionedEntry(txEntry.keepBinary()); // Construct new entry info. - GridCacheContext entryCtx = txEntry.context(); + GridCacheContext entryCtx = txEntry.context(); GridCacheVersionedEntryEx newEntry = new GridCacheLazyPlainVersionedEntry( entryCtx, @@ -1853,7 +1847,7 @@ else if (op == DELETE && resVal != null) * @return {@code True} if entry is locally mapped as a primary or back up node. */ protected boolean isNearLocallyMapped(IgniteTxEntry e, boolean primaryOnly) { - GridCacheContext cacheCtx = e.context(); + GridCacheContext cacheCtx = e.context(); if (!cacheCtx.isNear()) return false; From 1f68c625f1d14ca70efc62db95c7302147fc8583 Mon Sep 17 00:00:00 2001 From: Anton Vinogradov Date: Mon, 7 Aug 2023 16:29:53 +0300 Subject: [PATCH 08/33] WIP --- .../processors/cache/transactions/IgniteTxLocalAdapter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 1f3a3f02b8845..041cc08a254bd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -1845,6 +1845,6 @@ private interface NearEntryUpdateClojure { * @throws IgniteCheckedException If operation is failed. * @throws GridCacheEntryRemovedException If entry is removed. */ - public void apply(E entry) throws IgniteCheckedException, GridCacheEntryRemovedException; + void apply(E entry) throws IgniteCheckedException, GridCacheEntryRemovedException; } } From 59d5c1bc6e25b8562be267d1b18038e054b1a3c9 Mon Sep 17 00:00:00 2001 From: Anton Vinogradov Date: Mon, 7 Aug 2023 16:31:11 +0300 Subject: [PATCH 09/33] WIP --- .../transactions/IgniteTxLocalAdapter.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 041cc08a254bd..e4c432e670b48 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -889,9 +889,9 @@ assert ownsLock(txEntry.cached()) : boolean persistenceEnabled = CU.isPersistenceEnabled(cctx.kernalContext().config()); if (persistenceEnabled) { - GridCacheDatabaseSharedManager dbManager = (GridCacheDatabaseSharedManager)cctx.database(); + GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)cctx.database(); - dbManager.getCheckpointer().skipCheckpointOnNodeStop(true); + dbMgr.getCheckpointer().skipCheckpointOnNodeStop(true); } throw ex; @@ -1280,9 +1280,9 @@ protected final void addInvokeResult(IgniteTxEntry txEntry, CacheInvokeEntry invokeEntry = new CacheInvokeEntry<>(txEntry.key(), key0, cacheVal, val0, ver, txEntry.keepBinary(), txEntry.cached()); - EntryProcessor entryProcessor = t.get1(); + EntryProcessor entryProc = t.get1(); - res = entryProcessor.process(invokeEntry, t.get2()); + res = entryProc.process(invokeEntry, t.get2()); val0 = invokeEntry.getValue(txEntry.keepBinary()); @@ -1364,7 +1364,7 @@ protected void checkValid(boolean checkTimeout) throws IgniteCheckedException { * @param val Value. * @param expiryPlc Explicitly specified expiry policy. * @param invokeArgs Optional arguments for EntryProcessor. - * @param entryProcessor Entry processor. + * @param entryProc Entry processor. * @param entry Cache entry. * @param filter Filter. * @param filtersSet {@code True} if filter should be marked as set. @@ -1376,7 +1376,7 @@ protected void checkValid(boolean checkTimeout) throws IgniteCheckedException { */ public final IgniteTxEntry addEntry(GridCacheOperation op, @Nullable CacheObject val, - @Nullable EntryProcessor entryProcessor, + @Nullable EntryProcessor entryProc, Object[] invokeArgs, GridCacheEntryEx entry, @Nullable ExpiryPolicy expiryPlc, @@ -1407,12 +1407,12 @@ public final IgniteTxEntry addEntry(GridCacheOperation op, IgniteTxEntry txEntry; if (old != null) { - if (entryProcessor != null) { + if (entryProc != null) { assert val == null; assert op == TRANSFORM; // Will change the op. - old.addEntryProcessor(entryProcessor, invokeArgs); + old.addEntryProcessor(entryProc, invokeArgs); } else { assert old.op() != TRANSFORM; @@ -1450,7 +1450,7 @@ public final IgniteTxEntry addEntry(GridCacheOperation op, this, op, val, - EntryProcessorResourceInjectorProxy.wrap(cctx.kernalContext(), entryProcessor), + EntryProcessorResourceInjectorProxy.wrap(cctx.kernalContext(), entryProc), invokeArgs, hasDrTtl ? drTtl : -1L, entry, From f58bbe460a3f09edd6539ab9471f433f591bf7d7 Mon Sep 17 00:00:00 2001 From: Anton Vinogradov Date: Mon, 7 Aug 2023 16:34:43 +0300 Subject: [PATCH 10/33] WIP --- .../transactions/IgniteTxLocalAdapter.java | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index e4c432e670b48..296823e393398 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -67,7 +67,6 @@ import org.apache.ignite.internal.util.lang.GridTuple; import org.apache.ignite.internal.util.tostring.GridToStringBuilder; import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.CX1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; @@ -1711,10 +1710,8 @@ protected PostLockClosure1(T arg, boolean commit) { setRollbackOnly(); if (commit && commitAfterLock()) - return rollbackAsync().chain(new C1, T>() { - @Override public T apply(IgniteInternalFuture f) { - throw new GridClosureException(e); - } + return rollbackAsync().chain((IgniteInternalFuture f) -> { + throw new GridClosureException(e); }); throw new GridClosureException(e); @@ -1730,10 +1727,8 @@ protected PostLockClosure1(T arg, boolean commit) { ); if (commit && commitAfterLock()) - return rollbackAsync().chain(new C1, T>() { - @Override public T apply(IgniteInternalFuture f) { - throw ex; - } + return rollbackAsync().chain((IgniteInternalFuture f) -> { + throw ex; }); throw ex; @@ -1763,10 +1758,8 @@ protected PostLockClosure1(T arg, boolean commit) { } catch (final IgniteCheckedException ex) { if (commit && commitAfterLock()) - return rollbackAsync().chain(new C1, T>() { - @Override public T apply(IgniteInternalFuture f) { - throw new GridClosureException(ex); - } + return rollbackAsync().chain((IgniteInternalFuture f) -> { + throw new GridClosureException(ex); }); throw new GridClosureException(ex); From 6290589ec7fad40b5f07934caf2dd77e4eb683a7 Mon Sep 17 00:00:00 2001 From: Anton Vinogradov Date: Mon, 7 Aug 2023 16:37:36 +0300 Subject: [PATCH 11/33] WIP --- .../processors/cache/distributed/dht/GridDhtTxLocalAdapter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index f82ad2b8b23ea..734d41dba6c33 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -251,7 +251,7 @@ public void explicitLock(boolean explicitLock) { * @return Versions for all pending locks that were in queue before tx locks were released. */ Collection pendingVersions() { - return pendingVers == null ? Collections.emptyList() : pendingVers; + return pendingVers == null ? Collections.emptyList() : pendingVers; } /** From db416a65ba6ebce54c1b846465ec04feccb6532c Mon Sep 17 00:00:00 2001 From: Anton Vinogradov Date: Mon, 7 Aug 2023 16:39:34 +0300 Subject: [PATCH 12/33] WIP --- .../cache/distributed/dht/GridDhtTxLocalAdapter.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 734d41dba6c33..2289b371ba63d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -279,7 +279,7 @@ protected void mapExplicitLocks() { for (IgniteTxEntry e : allEntries()) { assert e.cached() != null; - GridCacheContext cacheCtx = e.cached().context(); + GridCacheContext cacheCtx = e.cached().context(); if (cacheCtx.isNear()) continue; @@ -470,9 +470,9 @@ private void addMapping( checkInternal(e.txKey()); - GridCacheContext cacheCtx = e.context(); + GridCacheContext cacheCtx = e.context(); - GridDhtCacheAdapter dhtCache = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht(); + GridDhtCacheAdapter dhtCache = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht(); try { IgniteTxEntry existing = entry(e.txKey()); @@ -575,7 +575,7 @@ IgniteInternalFuture lockAllAsync( try { AffinityTopologyVersion topVer = topologyVersion(); - GridDhtCacheAdapter dhtCache = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht(); + GridDhtCacheAdapter dhtCache = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht(); // Enlist locks into transaction. for (int i = 0; i < entries.size(); i++) { @@ -911,7 +911,7 @@ else if (updateLockFuture(fut, ROLLBACK_FUT)) /** * @param prepFut Prepare future. - * @return If transaction if finished on prepare step returns future which is completed after transaction finish. + * @return If transaction is finished on prepare step returns future which is completed after transaction finish. */ protected final IgniteInternalFuture chainOnePhasePrepare( final GridDhtTxPrepareFuture prepFut) { From f3672b4e6f6d1b615608fb7b80dc01413aa053c0 Mon Sep 17 00:00:00 2001 From: Anton Vinogradov Date: Mon, 7 Aug 2023 16:40:35 +0300 Subject: [PATCH 13/33] WIP --- .../processors/cache/distributed/near/GridNearTxLocal.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index c5555d97ac930..812be676fecd4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -3377,7 +3377,7 @@ public IgniteInternalFuture requestSnapshot() { MvccCoordinator crd = prc.currentCoordinator(); synchronized (this) { - this.crdVer = crd.version(); + crdVer = crd.version(); } if (crd.local()) @@ -3776,7 +3776,7 @@ public long systemTimeCurrent() { if (!commitOrRollbackTime.compareAndSet(0, System.nanoTime() - commitOrRollbackStartTime.get())) return res; - long systemTimeMillis = U.nanosToMillis(this.systemTime.get()); + long systemTimeMillis = U.nanosToMillis(systemTime.get()); long totalTimeMillis = System.currentTimeMillis() - startTime(); // In some cases totalTimeMillis can be less than systemTimeMillis, as they are calculated with different precision. @@ -3894,7 +3894,7 @@ public IgniteInternalFuture prepareNearTxLocal() { if (trackTimeout) { prepFut.listen(new IgniteInClosure>() { @Override public void apply(IgniteInternalFuture f) { - GridNearTxLocal.this.removeTimeoutHandler(); + removeTimeoutHandler(); } }); } From 6f3394e18e7138725b8f339f764df59b1996339f Mon Sep 17 00:00:00 2001 From: Anton Vinogradov Date: Mon, 7 Aug 2023 16:43:33 +0300 Subject: [PATCH 14/33] WIP --- .../distributed/near/GridNearTxLocal.java | 104 +++++++++--------- 1 file changed, 52 insertions(+), 52 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 812be676fecd4..f568496c8dd83 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -191,13 +191,13 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou /** * Counts how much time this transaction has spent on system calls, in nanoseconds. */ - private final AtomicLong systemTime = new AtomicLong(0); + private final AtomicLong sysTime = new AtomicLong(0); /** * Stores the nano time value when current system time has started, or 0 if no system section * is running currently. */ - private final AtomicLong systemStartTime = new AtomicLong(0); + private final AtomicLong sysStartTime = new AtomicLong(0); /** * Stores the nano time value when prepare step has started, or 0 if no prepare step @@ -492,20 +492,20 @@ public final IgniteInternalFuture putAsync( /** * @param cacheCtx Cache context. * @param key Key. - * @param entryProcessor Entry processor. + * @param entryProc Entry processor. * @param invokeArgs Optional arguments for entry processor. * @return Operation future. */ public IgniteInternalFuture invokeAsync(GridCacheContext cacheCtx, @Nullable AffinityTopologyVersion entryTopVer, K key, - EntryProcessor entryProcessor, + EntryProcessor entryProc, Object... invokeArgs) { return (IgniteInternalFuture)putAsync0(cacheCtx, entryTopVer, key, null, - entryProcessor, + entryProc, invokeArgs, true, null); @@ -593,7 +593,7 @@ public IgniteInternalFuture removeAllAsync( * @param cacheCtx Cache context. * @param key Key. * @param val Value. - * @param entryProcessor Entry processor. + * @param entryProc Entry processor. * @param invokeArgs Optional arguments for EntryProcessor. * @param retval Return value flag. * @param filter Filter. @@ -604,7 +604,7 @@ private IgniteInternalFuture putAsync0( @Nullable AffinityTopologyVersion entryTopVer, K key, @Nullable V val, - @Nullable EntryProcessor entryProcessor, + @Nullable EntryProcessor entryProc, @Nullable final Object[] invokeArgs, final boolean retval, @Nullable final CacheEntryPredicate filter @@ -613,7 +613,7 @@ private IgniteInternalFuture putAsync0( if (cacheCtx.mvccEnabled()) return mvccPutAllAsync0(cacheCtx, Collections.singletonMap(key, val), - entryProcessor == null ? null : Collections.singletonMap(key, entryProcessor), invokeArgs, retval, filter); + entryProc == null ? null : Collections.singletonMap(key, entryProc), invokeArgs, retval, filter); try { beforePut(cacheCtx, retval, false); @@ -636,7 +636,7 @@ private IgniteInternalFuture putAsync0( cacheKey, val, opCtx != null ? opCtx.expiry() : null, - entryProcessor, + entryProc, invokeArgs, retval, filters, @@ -670,7 +670,7 @@ private IgniteInternalFuture putAsync0( IgniteInternalFuture fut = cacheCtx.cache().txLockAsync(enlisted, timeout, this, - /*read*/entryProcessor != null, // Needed to force load from store. + /*read*/entryProc != null, // Needed to force load from store. retval, isolation, isInvalidate(), @@ -799,9 +799,9 @@ private IgniteInternalFuture mvccPutAllAsync0( } Object val = map == null ? null : map.get(key); - EntryProcessor entryProcessor = transform ? invokeMap.get(key) : null; + EntryProcessor entryProc = transform ? invokeMap.get(key) : null; - if (val == null && entryProcessor == null) { + if (val == null && entryProc == null) { setRollbackOnly(); throw new NullPointerException("Null value."); @@ -810,7 +810,7 @@ private IgniteInternalFuture mvccPutAllAsync0( KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key); if (transform) - enlisted.put(cacheKey, new GridInvokeValue(entryProcessor, invokeArgs)); + enlisted.put(cacheKey, new GridInvokeValue(entryProc, invokeArgs)); else enlisted.put(cacheKey, val); } @@ -1025,7 +1025,7 @@ private IgniteInternalFuture putAllAsync0( * @param cacheKey Key to enlist. * @param val Value. * @param expiryPlc Explicitly specified expiry policy for entry. - * @param entryProcessor Entry processor (for invoke operation). + * @param entryProc Entry processor (for invoke operation). * @param invokeArgs Optional arguments for EntryProcessor. * @param retval Flag indicating whether a value should be returned. * @param filter User filters. @@ -1041,7 +1041,7 @@ private IgniteInternalFuture enlistWrite( KeyCacheObject cacheKey, Object val, @Nullable ExpiryPolicy expiryPlc, - @Nullable EntryProcessor entryProcessor, + @Nullable EntryProcessor entryProc, @Nullable Object[] invokeArgs, final boolean retval, final CacheEntryPredicate[] filter, @@ -1064,7 +1064,7 @@ private IgniteInternalFuture enlistWrite( final boolean needVal = retval || hasFilters; final boolean needReadVer = needVal && (serializable() && optimistic()); - if (entryProcessor != null) + if (entryProc != null) transform = true; GridCacheVersion drVer = dataCenterId != null ? cacheCtx.cache().nextVersion(dataCenterId) : null; @@ -1073,7 +1073,7 @@ private IgniteInternalFuture enlistWrite( entryTopVer, cacheKey, val, - entryProcessor, + entryProc, invokeArgs, expiryPlc, retval, @@ -1104,7 +1104,7 @@ private IgniteInternalFuture enlistWrite( needReadVer, false, hasFilters, - /*read through*/(entryProcessor != null || cacheCtx.config().isLoadPreviousValue()) && !skipStore, + /*read through*/(entryProc != null || cacheCtx.config().isLoadPreviousValue()) && !skipStore, retval, keepBinary, recovery, @@ -1218,7 +1218,7 @@ private IgniteInternalFuture enlistWrite( } Object val = rmv || lookup == null ? null : lookup.get(key); - EntryProcessor entryProcessor = invokeMap == null ? null : invokeMap.get(key); + EntryProcessor entryProc = invokeMap == null ? null : invokeMap.get(key); GridCacheVersion drVer; long drTtl; @@ -1251,7 +1251,7 @@ else if (dataCenterId != null) { drExpireTime = -1L; } - if (!rmv && val == null && entryProcessor == null) { + if (!rmv && val == null && entryProc == null) { setRollbackOnly(); throw new NullPointerException("Null value."); @@ -1263,7 +1263,7 @@ else if (dataCenterId != null) { entryTopVer, cacheKey, val, - entryProcessor, + entryProc, invokeArgs, expiryPlc, retval, @@ -1336,7 +1336,7 @@ else if (dataCenterId != null) { * @param cacheCtx Cache context. * @param cacheKey Key. * @param val Value. - * @param entryProcessor Entry processor. + * @param entryProc Entry processor. * @param invokeArgs Optional arguments for EntryProcessor. * @param expiryPlc Explicitly specified expiry policy for entry. * @param retval Return value flag. @@ -1358,7 +1358,7 @@ private boolean enlistWriteEntry(GridCacheContext cacheCtx, @Nullable AffinityTopologyVersion entryTopVer, final KeyCacheObject cacheKey, @Nullable final Object val, - @Nullable final EntryProcessor entryProcessor, + @Nullable final EntryProcessor entryProc, @Nullable final Object[] invokeArgs, @Nullable final ExpiryPolicy expiryPlc, final boolean retval, @@ -1377,7 +1377,7 @@ private boolean enlistWriteEntry(GridCacheContext cacheCtx, ) throws IgniteCheckedException { boolean loadMissed = false; - final boolean rmv = val == null && entryProcessor == null; + final boolean rmv = val == null && entryProc == null; IgniteTxKey txKey = cacheCtx.txKey(cacheKey); @@ -1417,7 +1417,7 @@ private boolean enlistWriteEntry(GridCacheContext cacheCtx, this, /*metrics*/retval, /*events*/retval, - entryProcessor, + entryProc, resolveTaskName(), null, keepBinary, @@ -1443,7 +1443,7 @@ private boolean enlistWriteEntry(GridCacheContext cacheCtx, /*read through*/false, /*metrics*/retval, /*events*/retval, - entryProcessor, + entryProc, resolveTaskName(), null, keepBinary); @@ -1463,7 +1463,7 @@ private boolean enlistWriteEntry(GridCacheContext cacheCtx, old = entry.rawGet(); final GridCacheOperation op = rmv ? DELETE : - entryProcessor != null ? TRANSFORM : old != null ? UPDATE : CREATE; + entryProc != null ? TRANSFORM : old != null ? UPDATE : CREATE; if (old != null && hasFilters && !filter(entry.context(), cacheKey, old, filter)) { ret.set( @@ -1478,7 +1478,7 @@ private boolean enlistWriteEntry(GridCacheContext cacheCtx, if (optimistic() && serializable()) { txEntry = addEntry(op, old, - entryProcessor, + entryProc, invokeArgs, entry, expiryPlc, @@ -1530,7 +1530,7 @@ private boolean enlistWriteEntry(GridCacheContext cacheCtx, txEntry = addEntry(op, cVal, - entryProcessor, + entryProc, invokeArgs, entry, expiryPlc, @@ -1622,7 +1622,7 @@ private boolean enlistWriteEntry(GridCacheContext cacheCtx, } } else { - if (entryProcessor == null && txEntry.op() == TRANSFORM) + if (entryProc == null && txEntry.op() == TRANSFORM) throw new IgniteCheckedException("Failed to enlist write value for key (cannot have update value in " + "transaction after EntryProcessor is applied): " + CU.value(cacheKey, cacheCtx, false)); @@ -1639,7 +1639,7 @@ private boolean enlistWriteEntry(GridCacheContext cacheCtx, return loadMissed; } - GridCacheOperation op = rmv ? DELETE : entryProcessor != null ? TRANSFORM : + GridCacheOperation op = rmv ? DELETE : entryProc != null ? TRANSFORM : v != null ? UPDATE : CREATE; CacheObject cVal = cacheCtx.toCacheObject(val); @@ -1649,7 +1649,7 @@ private boolean enlistWriteEntry(GridCacheContext cacheCtx, txEntry = addEntry(op, cVal, - entryProcessor, + entryProc, invokeArgs, entry, expiryPlc, @@ -3756,13 +3756,13 @@ private void readyNearLock(IgniteTxEntry txEntry, * @return Amount of time in milliseconds. */ public long systemTimeCurrent() { - long systemTime0 = systemTime.get(); + long sysTime0 = sysTime.get(); - long systemStartTime0 = systemStartTime.get(); + long sysStartTime0 = sysStartTime.get(); - long t = systemStartTime0 == 0 ? 0 : (System.nanoTime() - systemStartTime0); + long t = sysStartTime0 == 0 ? 0 : (System.nanoTime() - sysStartTime0); - return U.nanosToMillis(systemTime0 + t); + return U.nanosToMillis(sysTime0 + t); } /** {@inheritDoc} */ @@ -3776,13 +3776,13 @@ public long systemTimeCurrent() { if (!commitOrRollbackTime.compareAndSet(0, System.nanoTime() - commitOrRollbackStartTime.get())) return res; - long systemTimeMillis = U.nanosToMillis(systemTime.get()); + long sysTimeMillis = U.nanosToMillis(sysTime.get()); long totalTimeMillis = System.currentTimeMillis() - startTime(); // In some cases totalTimeMillis can be less than systemTimeMillis, as they are calculated with different precision. - long userTimeMillis = Math.max(totalTimeMillis - systemTimeMillis, 0); + long userTimeMillis = Math.max(totalTimeMillis - sysTimeMillis, 0); - cctx.txMetrics().onNearTxComplete(systemTimeMillis, userTimeMillis); + cctx.txMetrics().onNearTxComplete(sysTimeMillis, userTimeMillis); boolean willBeSkipped = txDumpsThrottling == null || txDumpsThrottling.skipCurrent(); @@ -3797,7 +3797,7 @@ public long systemTimeCurrent() { && ThreadLocalRandom.current().nextDouble() <= transactionTimeDumpSamplesCoefficient; if (randomlyChosen || isLong) { - String txDump = completedTransactionDump(state, systemTimeMillis, userTimeMillis, isLong); + String txDump = completedTransactionDump(state, sysTimeMillis, userTimeMillis, isLong); if (isLong) log.warning(txDump); @@ -3818,27 +3818,27 @@ else if (txDumpsThrottling != null) * Builds dump string for completed transaction. * * @param state Transaction state. - * @param systemTimeMillis System time in milliseconds. + * @param sysTimeMillis System time in milliseconds. * @param userTimeMillis User time in milliseconds. * @param isLong Whether the dumped transaction is long running or not. * @return Dump string. */ private String completedTransactionDump( TransactionState state, - long systemTimeMillis, + long sysTimeMillis, long userTimeMillis, boolean isLong ) { long cacheOperationsTimeMillis = - U.nanosToMillis(systemTime.get() - prepareTime.get() - commitOrRollbackTime.get()); + U.nanosToMillis(sysTime.get() - prepareTime.get() - commitOrRollbackTime.get()); GridStringBuilder warning = new GridStringBuilder(isLong ? "Long transaction time dump " : "Transaction time dump ") .a("[startTime=") .a(IgniteUtils.DEBUG_DATE_FMT.format(Instant.ofEpochMilli(startTime))) .a(", totalTime=") - .a(systemTimeMillis + userTimeMillis) + .a(sysTimeMillis + userTimeMillis) .a(", systemTime=") - .a(systemTimeMillis) + .a(sysTimeMillis) .a(", userTime=") .a(userTimeMillis) .a(", cacheOperationsTime=") @@ -3927,11 +3927,11 @@ public IgniteInternalFuture prepareNearTxLocal() { } /** - * @param awaitLastFuture If true - method will wait until transaction finish every action started before. + * @param awaitLastFut If true - method will wait until transaction finish every action started before. * @throws IgniteCheckedException If failed. */ - public final void prepare(boolean awaitLastFuture) throws IgniteCheckedException { - if (awaitLastFuture) + public final void prepare(boolean awaitLastFut) throws IgniteCheckedException { + if (awaitLastFut) txState().awaitLastFuture(cctx); prepareNearTxLocal().get(); @@ -5016,17 +5016,17 @@ private long timeMillis(AtomicLong atomicNanoTime) { public void enterSystemSection() { // Setting systemStartTime only if it equals 0, otherwise it means that we are already in system section // and should do nothing. - systemStartTime.compareAndSet(0, System.nanoTime()); + sysStartTime.compareAndSet(0, System.nanoTime()); } /** * Leaves the section when system time for this transaction is counted. */ public void leaveSystemSection() { - long systemStartTime0 = systemStartTime.getAndSet(0); + long sysStartTime0 = sysStartTime.getAndSet(0); - if (systemStartTime0 > 0) - systemTime.addAndGet(System.nanoTime() - systemStartTime0); + if (sysStartTime0 > 0) + sysTime.addAndGet(System.nanoTime() - sysStartTime0); } /** From edbfd59d3fb61e1fcf591f4e8761ff042a371273 Mon Sep 17 00:00:00 2001 From: Anton Vinogradov Date: Mon, 7 Aug 2023 16:44:22 +0300 Subject: [PATCH 15/33] WIP --- .../processors/cache/distributed/near/GridNearTxLocal.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index f568496c8dd83..f60dff1fab74b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -3970,7 +3970,7 @@ public IgniteInternalFuture commitNearTxLocalAsync() { // Properly finish prepFut in case of unchecked error. assert prepareFut != null; // Prep future must be set. - ((GridNearTxPrepareFutureAdapter)prepFut).onDone(t); + ((GridFutureAdapter)prepFut).onDone(t); } prepareFut.listen(new CI1>() { From 0d83236bb3566ee63cfa9e6f48788edbb028f411 Mon Sep 17 00:00:00 2001 From: Anton Vinogradov Date: Mon, 7 Aug 2023 16:47:23 +0300 Subject: [PATCH 16/33] WIP --- .../processors/cache/distributed/near/GridNearTxLocal.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index f60dff1fab74b..7f6301b476531 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -3633,7 +3633,7 @@ private void readyNearLock(IgniteTxEntry txEntry, Collection rolledbackVers ) { while (true) { - GridCacheContext cacheCtx = txEntry.cached().context(); + GridCacheContext cacheCtx = txEntry.cached().context(); assert cacheCtx.isNear(); @@ -4607,7 +4607,7 @@ public void close(boolean clearThreadMap) throws IgniteCheckedException { for (Map.Entry e : accessMap.entrySet()) { if (e.getValue().entries() != null) { - GridCacheContext cctx0 = cctx.cacheContext(e.getKey().cacheId()); + GridCacheContext cctx0 = cctx.cacheContext(e.getKey().cacheId()); if (cctx0.isNear()) cctx0.near().dht().sendTtlUpdateRequest(e.getValue()); From d4482a8e6bab7d4ca64805cab2c5f10e44d3e6c2 Mon Sep 17 00:00:00 2001 From: Anton Vinogradov Date: Mon, 7 Aug 2023 16:51:15 +0300 Subject: [PATCH 17/33] WIP --- .../cache/distributed/near/GridNearTxLocal.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 7f6301b476531..e5f1949a1f7b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -541,11 +541,7 @@ public IgniteInternalFuture putAllDrAsync( GridCacheContext cacheCtx, Map drMap ) { - Map map = F.viewReadOnly(drMap, new IgniteClosure() { - @Override public Object apply(GridCacheDrInfo val) { - return val.value(); - } - }); + Map map = F.viewReadOnly(drMap, (IgniteClosure)GridCacheDrInfo::value); return this.putAllAsync0(cacheCtx, null, @@ -2755,7 +2751,7 @@ private Collection enlistRead( // First time access within transaction. else { if (lockKeys == null && !skipVals) - lockKeys = single ? Collections.singleton(key) : new ArrayList(keysCnt); + lockKeys = single ? Collections.singleton(key) : new ArrayList<>(keysCnt); if (!single && !skipVals) lockKeys.add(key); From aacbd641e23306f37138bb3d8ce3f25b2a7aff05 Mon Sep 17 00:00:00 2001 From: Anton Vinogradov Date: Mon, 7 Aug 2023 16:53:55 +0300 Subject: [PATCH 18/33] WIP --- .../distributed/near/GridNearTxLocal.java | 33 ------------------- 1 file changed, 33 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index e5f1949a1f7b1..0f52454686360 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -4458,39 +4458,6 @@ public IgniteInternalFuture lockAllAsync(GridCacheContext c ); } - /** - * Gets cache entry for given key. - * - * @param cacheCtx Cache context. - * @param key Key. - * @return Cache entry. - */ - protected GridCacheEntryEx entryEx(GridCacheContext cacheCtx, IgniteTxKey key) { - if (cacheCtx.isColocated()) { - IgniteTxEntry txEntry = entry(key); - - if (txEntry == null) - return cacheCtx.colocated().entryExx(key.key(), topologyVersion(), true); - - GridCacheEntryEx cached = txEntry.cached(); - - assert cached != null; - - if (cached.detached()) - return cached; - - if (cached.obsoleteVersion() != null) { - cached = cacheCtx.colocated().entryExx(key.key(), topologyVersion(), true); - - txEntry.cached(cached); - } - - return cached; - } - else - return cacheCtx.cache().entryEx(key.key()); - } - /** {@inheritDoc} */ @Override protected GridCacheEntryEx entryEx( GridCacheContext cacheCtx, From 8916918d582cd671e94d03e0cb77fbf7ed820e1e Mon Sep 17 00:00:00 2001 From: Anton Vinogradov Date: Mon, 7 Aug 2023 16:56:43 +0300 Subject: [PATCH 19/33] WIP --- .../distributed/near/GridNearTxLocal.java | 32 ++++++++----------- 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 0f52454686360..606aa75631aa6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -1106,16 +1106,14 @@ private IgniteInternalFuture enlistWrite( recovery, expiryPlc); - loadFut.listen(new IgniteInClosure>() { - @Override public void apply(IgniteInternalFuture fut) { - try { - fut.get(); + loadFut.listen((IgniteInternalFuture fut) -> { + try { + fut.get(); - finishFuture(enlistFut, null, true); - } - catch (IgniteCheckedException e) { - finishFuture(enlistFut, e, true); - } + finishFuture(enlistFut, null, true); + } + catch (IgniteCheckedException e) { + finishFuture(enlistFut, e, true); } }); @@ -1304,16 +1302,14 @@ else if (dataCenterId != null) { recovery, expiryPlc); - loadFut.listen(new IgniteInClosure>() { - @Override public void apply(IgniteInternalFuture fut) { - try { - fut.get(); + loadFut.listen((IgniteInternalFuture fut) -> { + try { + fut.get(); - finishFuture(enlistFut, null, true); - } - catch (IgniteCheckedException e) { - finishFuture(enlistFut, e, true); - } + finishFuture(enlistFut, null, true); + } + catch (IgniteCheckedException e) { + finishFuture(enlistFut, e, true); } }); From 02da18475aa5a70b53747191152c4147457c1ec7 Mon Sep 17 00:00:00 2001 From: Anton Vinogradov Date: Mon, 7 Aug 2023 16:58:10 +0300 Subject: [PATCH 20/33] WIP --- .../distributed/near/GridNearTxLocal.java | 98 +++++++++---------- 1 file changed, 47 insertions(+), 51 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 606aa75631aa6..805dd1429aba2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -2903,74 +2903,70 @@ private IgniteInternalFuture loadMissing( final boolean recovery, final ExpiryPolicy expiryPlc) { GridInClosure3 c = - new GridInClosure3() { - @Override public void apply(KeyCacheObject key, - @Nullable Object val, - @Nullable GridCacheVersion loadVer) { - if (log.isDebugEnabled()) - log.debug("Loaded value from remote node [key=" + key + ", val=" + val + ']'); + (KeyCacheObject key, @Nullable Object val, @Nullable GridCacheVersion loadVer) -> { + if (log.isDebugEnabled()) + log.debug("Loaded value from remote node [key=" + key + ", val=" + val + ']'); - IgniteTxEntry e = entry(new IgniteTxKey(key, cacheCtx.cacheId())); + IgniteTxEntry e = entry(new IgniteTxKey(key, cacheCtx.cacheId())); - assert e != null; + assert e != null; - if (needReadVer) { - assert loadVer != null; + if (needReadVer) { + assert loadVer != null; - e.entryReadVersion(singleRmv && val != null ? SER_READ_NOT_EMPTY_VER : loadVer); - } + e.entryReadVersion(singleRmv && val != null ? SER_READ_NOT_EMPTY_VER : loadVer); + } - if (singleRmv) { - assert !hasFilters && !retval; - assert val == null || Boolean.TRUE.equals(val) : val; + if (singleRmv) { + assert !hasFilters && !retval; + assert val == null || Boolean.TRUE.equals(val) : val; - ret.set(cacheCtx, null, val != null, keepBinary, U.deploymentClassLoader(cctx.kernalContext(), deploymentLdrId)); - } - else { - CacheObject cacheVal = cacheCtx.toCacheObject(val); - - if (e.op() == TRANSFORM) { - GridCacheVersion ver; + ret.set(cacheCtx, null, val != null, keepBinary, U.deploymentClassLoader(cctx.kernalContext(), deploymentLdrId)); + } + else { + CacheObject cacheVal = cacheCtx.toCacheObject(val); - e.readValue(cacheVal); + if (e.op() == TRANSFORM) { + GridCacheVersion ver; - try { - ver = e.cached().version(); - } - catch (GridCacheEntryRemovedException ex) { - assert optimistic() : e; + e.readValue(cacheVal); - if (log.isDebugEnabled()) - log.debug("Failed to get entry version: [msg=" + ex.getMessage() + ']'); + try { + ver = e.cached().version(); + } + catch (GridCacheEntryRemovedException ex) { + assert optimistic() : e; - ver = null; - } + if (log.isDebugEnabled()) + log.debug("Failed to get entry version: [msg=" + ex.getMessage() + ']'); - addInvokeResult(e, cacheVal, ret, ver); + ver = null; } - else { - boolean success; - if (hasFilters) { - success = isAll(e.context(), key, cacheVal, filter); + addInvokeResult(e, cacheVal, ret, ver); + } + else { + boolean success; - if (!success) { - e.value(cacheVal, false, false); + if (hasFilters) { + success = isAll(e.context(), key, cacheVal, filter); - e.op(READ); - } - } - else - success = true; + if (!success) { + e.value(cacheVal, false, false); - ret.set( - cacheCtx, - cacheVal, - success, - keepBinary, - U.deploymentClassLoader(cctx.kernalContext(), deploymentLdrId) - ); + e.op(READ); + } } + else + success = true; + + ret.set( + cacheCtx, + cacheVal, + success, + keepBinary, + U.deploymentClassLoader(cctx.kernalContext(), deploymentLdrId) + ); } } }; From 8da1b44e49a17978dc01627366350b6bb24b967b Mon Sep 17 00:00:00 2001 From: Anton Vinogradov Date: Mon, 7 Aug 2023 17:22:12 +0300 Subject: [PATCH 21/33] WIP --- .../distributed/near/GridNearTxLocal.java | 39 +++++++++---------- 1 file changed, 19 insertions(+), 20 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 805dd1429aba2..48a5078a79130 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -3086,7 +3086,7 @@ private void onException() { * @return Future with {@code True} value if loading took place. */ private IgniteInternalFuture loadMissing( - final GridCacheContext cacheCtx, + final GridCacheContext cacheCtx, AffinityTopologyVersion topVer, boolean readThrough, final Collection keys, @@ -3104,28 +3104,27 @@ private IgniteInternalFuture loadMissing( if (cacheCtx.isNear()) { return cacheCtx.nearTx().txLoadAsync(this, - topVer, - keys, - readThrough, - needVer || !cacheCtx.config().isReadFromBackup() || (optimistic() && serializable() && readThrough), - /*deserializeBinary*/false, - recovery, - expiryPlc0, - skipVals, - needVer).chain(new C1>, Void>() { - @Override public Void apply(IgniteInternalFuture> f) { - try { - Map map = f.get(); + topVer, + keys, + readThrough, + needVer || !cacheCtx.config().isReadFromBackup() || (optimistic() && serializable() && readThrough), + /*deserializeBinary*/false, + recovery, + expiryPlc0, + skipVals, + needVer) + .chain((IgniteInternalFuture> f) -> { + try { + Map map = f.get(); - processLoaded(map, keys, needVer, c); + processLoaded(map, keys, needVer, c); - return null; - } - catch (Exception e) { - setRollbackOnly(); + return null; + } + catch (Exception e) { + setRollbackOnly(); - throw new GridClosureException(e); - } + throw new GridClosureException(e); } }); } From 3f782a831abad2656b5394453a0ecc90da5ae82e Mon Sep 17 00:00:00 2001 From: Anton Vinogradov Date: Mon, 7 Aug 2023 17:24:45 +0300 Subject: [PATCH 22/33] WIP --- .../distributed/near/GridNearTxLocal.java | 50 ++++++++----------- 1 file changed, 20 insertions(+), 30 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 48a5078a79130..9cb73dc8399fe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -99,7 +99,6 @@ import org.apache.ignite.internal.util.lang.GridInClosure3; import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.C2; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CX1; @@ -3185,20 +3184,18 @@ private IgniteInternalFuture loadMissing( recovery, null, label() - ).chain(new C1, Void>() { - @Override public Void apply(IgniteInternalFuture f) { - try { - Object val = f.get(); + ).chain((IgniteInternalFuture f) -> { + try { + Object val = f.get(); - processLoaded(key, val, needVer, skipVals, c); + processLoaded(key, val, needVer, skipVals, c); - return null; - } - catch (Exception e) { - setRollbackOnly(); + return null; + } + catch (Exception e) { + setRollbackOnly(); - throw new GridClosureException(e); - } + throw new GridClosureException(e); } }); } @@ -3217,20 +3214,18 @@ private IgniteInternalFuture loadMissing( /*keepCacheObject*/true, label(), null - ).chain(new C1>, Void>() { - @Override public Void apply(IgniteInternalFuture> f) { - try { - Map map = f.get(); + ).chain((IgniteInternalFuture> f) -> { + try { + Map map = f.get(); - processLoaded(map, keys, needVer, c); + processLoaded(map, keys, needVer, c); - return null; - } - catch (Exception e) { - setRollbackOnly(); + return null; + } + catch (Exception e) { + setRollbackOnly(); - throw new GridClosureException(e); - } + throw new GridClosureException(e); } }); } @@ -3878,13 +3873,8 @@ public IgniteInternalFuture prepareNearTxLocal() { if (!PREP_FUT_UPD.compareAndSet(this, null, fut)) return prepFut; - if (trackTimeout) { - prepFut.listen(new IgniteInClosure>() { - @Override public void apply(IgniteInternalFuture f) { - removeTimeoutHandler(); - } - }); - } + if (trackTimeout) + prepFut.listen((IgniteInternalFuture f) -> removeTimeoutHandler()); if (timeout == -1) { fut.onDone(this, timeoutException()); From 9f4f4dac6451226e498abd681c97b92d723b74b5 Mon Sep 17 00:00:00 2001 From: Anton Vinogradov Date: Mon, 7 Aug 2023 17:30:09 +0300 Subject: [PATCH 23/33] WIP --- .../distributed/near/GridNearTxLocal.java | 257 +++++++++--------- 1 file changed, 121 insertions(+), 136 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 9cb73dc8399fe..8b324d8898c76 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -97,9 +97,7 @@ import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridClosureException; import org.apache.ignite.internal.util.lang.GridInClosure3; -import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.typedef.C2; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CX1; import org.apache.ignite.internal.util.typedef.F; @@ -3950,37 +3948,35 @@ public IgniteInternalFuture commitNearTxLocalAsync() { ((GridFutureAdapter)prepFut).onDone(t); } - prepareFut.listen(new CI1>() { - @Override public void apply(IgniteInternalFuture f) { - // These values should not be changed after set once. - prepareTime.compareAndSet(0, System.nanoTime() - prepareStartTime.get()); + prepareFut.listen((IgniteInternalFuture f) -> { + // These values should not be changed after set once. + prepareTime.compareAndSet(0, System.nanoTime() - prepareStartTime.get()); - commitOrRollbackStartTime.compareAndSet(0, System.nanoTime()); + commitOrRollbackStartTime.compareAndSet(0, System.nanoTime()); - if (!onePhaseCommit) - incrementalSnapshotId(cctx.snapshotMgr().incrementalSnapshotId()); + if (!onePhaseCommit) + incrementalSnapshotId(cctx.snapshotMgr().incrementalSnapshotId()); - try { - // Make sure that here are no exceptions. - f.get(); + try { + // Make sure that here are no exceptions. + f.get(); - fut.finish(true, true, false); - } - catch (Error | RuntimeException e) { - COMMIT_ERR_UPD.compareAndSet(GridNearTxLocal.this, null, e); + fut.finish(true, true, false); + } + catch (Error | RuntimeException e) { + COMMIT_ERR_UPD.compareAndSet(GridNearTxLocal.this, null, e); - fut.finish(false, true, false); + fut.finish(false, true, false); - throw e; - } - catch (IgniteCheckedException e) { - COMMIT_ERR_UPD.compareAndSet(GridNearTxLocal.this, null, e); + throw e; + } + catch (IgniteCheckedException e) { + COMMIT_ERR_UPD.compareAndSet(GridNearTxLocal.this, null, e); - if (!(e instanceof NodeStoppingException)) - fut.finish(false, true, true); - else - fut.onNodeStop(e); - } + if (!(e instanceof NodeStoppingException)) + fut.finish(false, true, true); + else + fut.onNodeStop(e); } }); } @@ -4118,41 +4114,39 @@ private IgniteInternalFuture chainFinishFuture(final NearTxFin if (!commit) { final GridNearTxFinishFuture rollbackFut = new GridNearTxFinishFuture<>(cctx, this, false); - fut.listen(new IgniteInClosure>() { - @Override public void apply(IgniteInternalFuture fut0) { - if (FINISH_FUT_UPD.compareAndSet(tx, fut, rollbackFut)) { - switch (tx.state()) { - case COMMITTED: - if (log.isDebugEnabled()) - log.debug("Failed to rollback, transaction is already committed: " + tx); + fut.listen((IgniteInternalFuture fut0) -> { + if (FINISH_FUT_UPD.compareAndSet(tx, fut, rollbackFut)) { + switch (tx.state()) { + case COMMITTED: + if (log.isDebugEnabled()) + log.debug("Failed to rollback, transaction is already committed: " + tx); - // Fall-through. + // Fall-through. - case ROLLED_BACK: - rollbackFut.forceFinish(); + case ROLLED_BACK: + rollbackFut.forceFinish(); - assert rollbackFut.isDone() : rollbackFut; + assert rollbackFut.isDone() : rollbackFut; - break; + break; - default: // First finish attempt was unsuccessful. Try again. - rollbackFut.finish(false, clearThreadMap, onTimeout); - } + default: // First finish attempt was unsuccessful. Try again. + rollbackFut.finish(false, clearThreadMap, onTimeout); } - else { - finishFut.listen(new IgniteInClosure>() { - @Override public void apply(IgniteInternalFuture fut) { - try { - fut.get(); + } + else { + finishFut.listen(new IgniteInClosure>() { + @Override public void apply(IgniteInternalFuture fut) { + try { + fut.get(); - rollbackFut.markInitialized(); - } - catch (IgniteCheckedException e) { - rollbackFut.onDone(e); - } + rollbackFut.markInitialized(); } - }); - } + catch (IgniteCheckedException e) { + rollbackFut.onDone(e); + } + } + }); } }); @@ -4161,15 +4155,13 @@ private IgniteInternalFuture chainFinishFuture(final NearTxFin else { final GridFutureAdapter fut0 = new GridFutureAdapter<>(); - fut.listen(new IgniteInClosure>() { - @Override public void apply(IgniteInternalFuture fut) { - if (timedOut()) - fut0.onDone(new IgniteTxTimeoutCheckedException("Failed to commit transaction, " + - "transaction is concurrently rolled back on timeout: " + tx)); - else - fut0.onDone(new IgniteTxRollbackCheckedException("Failed to commit transaction, " + - "transaction is concurrently rolled back: " + tx)); - } + fut.listen((IgniteInternalFuture f) -> { + if (timedOut()) + fut0.onDone(new IgniteTxTimeoutCheckedException("Failed to commit transaction, " + + "transaction is concurrently rolled back on timeout: " + tx)); + else + fut0.onDone(new IgniteTxRollbackCheckedException("Failed to commit transaction, " + + "transaction is concurrently rolled back: " + tx)); }); return fut0; @@ -4673,24 +4665,21 @@ private IgniteInternalFuture> checkMissed( final boolean needReadVer = (serializable() && optimistic()) || needVer; - return new GridEmbeddedFuture<>( - new C2>() { - @Override public Map apply(Void v, Exception e) { - if (e != null) - throw new GridClosureException(e); + return new GridEmbeddedFuture<>((Void v, Exception e) -> { + if (e != null) + throw new GridClosureException(e); - if (isRollbackOnly()) { - if (timedOut()) - throw new GridClosureException(new IgniteTxTimeoutCheckedException( - "Transaction has been timed out: " + GridNearTxLocal.this)); - else - throw new GridClosureException(new IgniteTxRollbackCheckedException( - "Transaction has been rolled back: " + GridNearTxLocal.this)); - } + if (isRollbackOnly()) { + if (timedOut()) + throw new GridClosureException(new IgniteTxTimeoutCheckedException( + "Transaction has been timed out: " + GridNearTxLocal.this)); + else + throw new GridClosureException(new IgniteTxRollbackCheckedException( + "Transaction has been rolled back: " + GridNearTxLocal.this)); + } - return map; - } - }, + return map; + }, loadMissing( cacheCtx, topVer, @@ -4702,69 +4691,67 @@ private IgniteInternalFuture> checkMissed( recovery, readRepairStrategy, expiryPlc, - new GridInClosure3() { - @Override public void apply(KeyCacheObject key, Object val, GridCacheVersion loadVer) { - CacheObject cacheVal = cacheCtx.toCacheObject(val); + (KeyCacheObject key, Object val, GridCacheVersion loadVer) -> { + CacheObject cacheVal = cacheCtx.toCacheObject(val); - CacheObject visibleVal = cacheVal; + CacheObject visibleVal = cacheVal; - IgniteTxKey txKey = cacheCtx.txKey(key); + IgniteTxKey txKey = cacheCtx.txKey(key); - IgniteTxEntry txEntry = entry(txKey); + IgniteTxEntry txEntry = entry(txKey); - if (txEntry != null) { - if (!readCommitted()) - txEntry.readValue(cacheVal); + if (txEntry != null) { + if (!readCommitted()) + txEntry.readValue(cacheVal); - if (!F.isEmpty(txEntry.entryProcessors())) - visibleVal = txEntry.applyEntryProcessors(visibleVal); - } + if (!F.isEmpty(txEntry.entryProcessors())) + visibleVal = txEntry.applyEntryProcessors(visibleVal); + } - assert txEntry != null || readCommitted() || skipVals; + assert txEntry != null || readCommitted() || skipVals; - GridCacheEntryEx e = txEntry == null ? entryEx(cacheCtx, txKey, topVer) : txEntry.cached(); + GridCacheEntryEx e = txEntry == null ? entryEx(cacheCtx, txKey, topVer) : txEntry.cached(); - if (readCommitted() || skipVals) { - e.touch(); + if (readCommitted() || skipVals) { + e.touch(); - if (visibleVal != null) { - cacheCtx.addResult(map, - key, - visibleVal, - skipVals, - keepCacheObjects, - deserializeBinary, - false, - needVer ? loadVer : null, - 0, - 0, - U.deploymentClassLoader(cctx.kernalContext(), deploymentLdrId)); - } + if (visibleVal != null) { + cacheCtx.addResult(map, + key, + visibleVal, + skipVals, + keepCacheObjects, + deserializeBinary, + false, + needVer ? loadVer : null, + 0, + 0, + U.deploymentClassLoader(cctx.kernalContext(), deploymentLdrId)); } - else { - assert txEntry != null; + } + else { + assert txEntry != null; - txEntry.setAndMarkValid(cacheVal); + txEntry.setAndMarkValid(cacheVal); - if (needReadVer) { - assert loadVer != null; + if (needReadVer) { + assert loadVer != null; - txEntry.entryReadVersion(loadVer); - } + txEntry.entryReadVersion(loadVer); + } - if (visibleVal != null) { - cacheCtx.addResult(map, - key, - visibleVal, - skipVals, - keepCacheObjects, - deserializeBinary, - false, - needVer ? loadVer : null, - 0, - 0, - U.deploymentClassLoader(cctx.kernalContext(), deploymentLdrId)); - } + if (visibleVal != null) { + cacheCtx.addResult(map, + key, + visibleVal, + skipVals, + keepCacheObjects, + deserializeBinary, + false, + needVer ? loadVer : null, + 0, + 0, + U.deploymentClassLoader(cctx.kernalContext(), deploymentLdrId)); } } }) @@ -4932,15 +4919,13 @@ private boolean removeTimeoutHandler() { } if (proceed || (state() == MARKED_ROLLBACK)) { - cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() { - @Override public void run() { - // Note: if rollback asynchronously on timeout should not clear thread map - // since thread started tx still should be able to see this tx. - rollbackNearTxLocalAsync(false, true); - - U.warn(log, "The transaction was forcibly rolled back because a timeout is reached: " + - CU.txString(GridNearTxLocal.this)); - } + cctx.kernalContext().closure().runLocalSafe(() -> { + // Note: if rollback asynchronously on timeout should not clear thread map + // since thread started tx still should be able to see this tx. + rollbackNearTxLocalAsync(false, true); + + U.warn(log, "The transaction was forcibly rolled back because a timeout is reached: " + + CU.txString(GridNearTxLocal.this)); }); } else { From 1b84b14e9315a36dbcee14bb48f4ccd1331bea4a Mon Sep 17 00:00:00 2001 From: Anton Vinogradov Date: Mon, 7 Aug 2023 17:34:16 +0300 Subject: [PATCH 24/33] WIP --- .../cache/distributed/near/GridNearTxLocal.java | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 8b324d8898c76..ca5b56bd7bbd6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -107,7 +107,6 @@ import org.apache.ignite.lang.IgniteBiClosure; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteClosure; -import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.security.SecurityPermission; import org.apache.ignite.transactions.TransactionConcurrency; @@ -4135,16 +4134,14 @@ private IgniteInternalFuture chainFinishFuture(final NearTxFin } } else { - finishFut.listen(new IgniteInClosure>() { - @Override public void apply(IgniteInternalFuture fut) { - try { - fut.get(); + finishFut.listen((IgniteInternalFuture f) -> { + try { + f.get(); - rollbackFut.markInitialized(); - } - catch (IgniteCheckedException e) { - rollbackFut.onDone(e); - } + rollbackFut.markInitialized(); + } + catch (IgniteCheckedException e) { + rollbackFut.onDone(e); } }); } From ab4b7f5babba07362cc2c443cb5f54821c3dbc12 Mon Sep 17 00:00:00 2001 From: Anton Vinogradov Date: Mon, 7 Aug 2023 17:43:12 +0300 Subject: [PATCH 25/33] WIP --- .../cache/distributed/dht/GridDhtTxLocal.java | 23 ++++--------------- 1 file changed, 4 insertions(+), 19 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index 4c092d6d5004f..a4ace267401ae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -47,12 +47,10 @@ import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; import org.apache.ignite.internal.util.tostring.GridToStringBuilder; import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; @@ -428,11 +426,7 @@ else if (!lockFut.isDone()) { */ final IgniteInternalFuture finalPrepFut = prepFut; - lockFut.listen(new IgniteInClosure>() { - @Override public void apply(IgniteInternalFuture ignored) { - finishTx(false, finalPrepFut, fut); - } - }); + lockFut.listen((IgniteInternalFuture ignored) -> finishTx(false, finalPrepFut, fut)); return; } @@ -502,13 +496,8 @@ public IgniteInternalFuture commitDhtLocalAsync() { if (prep != null) { if (prep.isDone()) finishTx(true, prep, fut); - else { - prep.listen(new CI1>() { - @Override public void apply(IgniteInternalFuture f) { - finishTx(true, f, fut); - } - }); - } + else + prep.listen((IgniteInternalFuture f) -> finishTx(true, f, fut)); } else { assert optimistic(); @@ -556,11 +545,7 @@ public IgniteInternalFuture rollbackDhtLocalAsync() { if (prepFut != null) { prepFut.complete(); - prepFut.listen(new CI1>() { - @Override public void apply(IgniteInternalFuture f) { - finishTx(false, f, fut); - } - }); + prepFut.listen((IgniteInternalFuture f) -> finishTx(false, f, fut)); } else finishTx(false, null, fut); From 9d933fc4b55559b6f976bfc74193e22b53b2f89d Mon Sep 17 00:00:00 2001 From: Anton Vinogradov Date: Mon, 7 Aug 2023 17:46:58 +0300 Subject: [PATCH 26/33] WIP --- .../cache/distributed/GridDistributedTxRemoteAdapter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index 544b6638a4db3..2d12a0d6a8049 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -155,7 +155,7 @@ protected GridDistributedTxRemoteAdapter( int txSize, @Nullable UUID subjId, int taskNameHash, - String txLbl + @Nullable String txLbl ) { super( ctx, From 4abbe62101419e785c4864b12599cb2d5d374910 Mon Sep 17 00:00:00 2001 From: Anton Vinogradov Date: Mon, 7 Aug 2023 17:47:53 +0300 Subject: [PATCH 27/33] WIP --- .../cache/distributed/GridDistributedTxRemoteAdapter.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index 2d12a0d6a8049..9b0f870cbbb6b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -99,8 +99,7 @@ /** * Transaction created by system implicitly on remote nodes. */ -public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter - implements IgniteTxRemoteEx { +public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter implements IgniteTxRemoteEx { /** Commit allowed field updater. */ private static final AtomicIntegerFieldUpdater COMMIT_ALLOWED_UPD = AtomicIntegerFieldUpdater.newUpdater(GridDistributedTxRemoteAdapter.class, "commitAllowed"); From 842853b2afc9bdfc91c0999045294a70ca186f17 Mon Sep 17 00:00:00 2001 From: Anton Vinogradov Date: Mon, 7 Aug 2023 17:51:04 +0300 Subject: [PATCH 28/33] WIP --- .../processors/cache/store/GridCacheStoreManagerAdapter.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java index 24ac8dc8675ca..40ef622b71eb2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java @@ -621,7 +621,7 @@ private void loadAllFromStore(@Nullable IgniteInternalTx tx, if (map.size() == 1) { Map.Entry> e = - ((Map>)map).entrySet().iterator().next(); + map.entrySet().iterator().next(); return put(tx, e.getKey(), e.getValue().get1(), e.getValue().get2()); } @@ -909,7 +909,7 @@ private void notifyCacheStoreSessionListeners(SessionData ses, @Nullable StoreOp break; default: - assert false : "Unexpected operation: " + op.toString(); + assert false : "Unexpected operation: " + op; } } if (notifyLsnrs) { From 1b9fcc75fa35620f82656edae7b474e652fab20a Mon Sep 17 00:00:00 2001 From: Anton Vinogradov Date: Mon, 7 Aug 2023 17:51:38 +0300 Subject: [PATCH 29/33] WIP --- .../processors/cache/store/GridCacheStoreManagerAdapter.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java index 40ef622b71eb2..296a3cf5848bd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java @@ -785,7 +785,7 @@ private void loadAllFromStore(@Nullable IgniteInternalTx tx, /** {@inheritDoc} */ @Override public final void sessionEnd(IgniteInternalTx tx, boolean commit, boolean last, - boolean storeSessionEnded) throws IgniteCheckedException { + boolean storeSesEnded) throws IgniteCheckedException { assert store != null; sessionInit0(tx, commit ? StoreOperation.COMMIT : StoreOperation.ROLLBACK, false); @@ -796,7 +796,7 @@ private void loadAllFromStore(@Nullable IgniteInternalTx tx, lsnr.onSessionEnd(locSes, commit); } - if (!sesHolder.get().ended(store) && !storeSessionEnded) + if (!sesHolder.get().ended(store) && !storeSesEnded) store.sessionEnd(commit); } catch (Throwable e) { From ae07c0c44fd54520564cf051a1c903722f94062a Mon Sep 17 00:00:00 2001 From: Anton Vinogradov Date: Mon, 7 Aug 2023 17:53:19 +0300 Subject: [PATCH 30/33] WIP --- .../cache/store/GridCacheStoreManagerAdapter.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java index 296a3cf5848bd..136a28098b5b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java @@ -351,13 +351,10 @@ private CacheStore cacheStoreWrapper(GridKernalContext ctx, "key", key, true, "val", val, true)); - if (convert) { + if (convert) val = convert(val); - return val; - } - else - return val; + return val; } return null; From 3fa744e12c6db42297b99f772accc161dfb50f14 Mon Sep 17 00:00:00 2001 From: Anton Vinogradov Date: Mon, 7 Aug 2023 17:56:01 +0300 Subject: [PATCH 31/33] WIP --- .../processors/cache/store/GridCacheStoreManagerAdapter.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java index 136a28098b5b1..adb13f44816fa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java @@ -973,8 +973,7 @@ private static class SessionData { private Object attach; /** */ - private final Set started = - new GridSetWrapper<>(new IdentityHashMap()); + private final Set started = new GridSetWrapper<>(new IdentityHashMap<>()); /** * @param tx Current transaction. From a3d4eb51a970e457dc8c0c1abad777b1698157ee Mon Sep 17 00:00:00 2001 From: Anton Vinogradov Date: Tue, 8 Aug 2023 18:41:46 +0300 Subject: [PATCH 32/33] WIP --- .../processors/cache/transactions/IgniteTxAdapter.java | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index d362096498fdc..b4bb0463e26f5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -1272,12 +1272,7 @@ private void recordStateChangedEvent(TransactionState state) { break; } - case PREPARING: - case MARKED_ROLLBACK: - case PREPARED: - case COMMITTING: - case ROLLING_BACK: - case UNKNOWN: + default: break; } } From b6ff3affa13dd220e50beb52900ed0719306206d Mon Sep 17 00:00:00 2001 From: Anton Vinogradov Date: Tue, 8 Aug 2023 19:05:46 +0300 Subject: [PATCH 33/33] WIP --- .../ignite/internal/util/lang/GridMetadataAwareAdapter.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridMetadataAwareAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridMetadataAwareAdapter.java index a530534c76621..c8e28753d2822 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridMetadataAwareAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridMetadataAwareAdapter.java @@ -21,6 +21,7 @@ import java.util.concurrent.Callable; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; /** @@ -278,9 +279,7 @@ public V addMetaIfAbsent(int key, V val) { * @param Type of the value. * @return The value of the metadata after execution of this method. */ - @Nullable public V addMetaIfAbsent(int key, @Nullable Callable c) { - assert c != null; - + @Nullable public V addMetaIfAbsent(int key, @NotNull Callable c) { synchronized (this) { V v = meta(key);