diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index 544b6638a4db3..9b0f870cbbb6b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -99,8 +99,7 @@ /** * Transaction created by system implicitly on remote nodes. */ -public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter - implements IgniteTxRemoteEx { +public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter implements IgniteTxRemoteEx { /** Commit allowed field updater. */ private static final AtomicIntegerFieldUpdater COMMIT_ALLOWED_UPD = AtomicIntegerFieldUpdater.newUpdater(GridDistributedTxRemoteAdapter.class, "commitAllowed"); @@ -155,7 +154,7 @@ protected GridDistributedTxRemoteAdapter( int txSize, @Nullable UUID subjId, int taskNameHash, - String txLbl + @Nullable String txLbl ) { super( ctx, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index 4c092d6d5004f..a4ace267401ae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -47,12 +47,10 @@ import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; import org.apache.ignite.internal.util.tostring.GridToStringBuilder; import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; @@ -428,11 +426,7 @@ else if (!lockFut.isDone()) { */ final IgniteInternalFuture finalPrepFut = prepFut; - lockFut.listen(new IgniteInClosure>() { - @Override public void apply(IgniteInternalFuture ignored) { - finishTx(false, finalPrepFut, fut); - } - }); + lockFut.listen((IgniteInternalFuture ignored) -> finishTx(false, finalPrepFut, fut)); return; } @@ -502,13 +496,8 @@ public IgniteInternalFuture commitDhtLocalAsync() { if (prep != null) { if (prep.isDone()) finishTx(true, prep, fut); - else { - prep.listen(new CI1>() { - @Override public void apply(IgniteInternalFuture f) { - finishTx(true, f, fut); - } - }); - } + else + prep.listen((IgniteInternalFuture f) -> finishTx(true, f, fut)); } else { assert optimistic(); @@ -556,11 +545,7 @@ public IgniteInternalFuture rollbackDhtLocalAsync() { if (prepFut != null) { prepFut.complete(); - prepFut.listen(new CI1>() { - @Override public void apply(IgniteInternalFuture f) { - finishTx(false, f, fut); - } - }); + prepFut.listen((IgniteInternalFuture f) -> finishTx(false, f, fut)); } else finishTx(false, null, fut); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index f82ad2b8b23ea..2289b371ba63d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -251,7 +251,7 @@ public void explicitLock(boolean explicitLock) { * @return Versions for all pending locks that were in queue before tx locks were released. */ Collection pendingVersions() { - return pendingVers == null ? Collections.emptyList() : pendingVers; + return pendingVers == null ? Collections.emptyList() : pendingVers; } /** @@ -279,7 +279,7 @@ protected void mapExplicitLocks() { for (IgniteTxEntry e : allEntries()) { assert e.cached() != null; - GridCacheContext cacheCtx = e.cached().context(); + GridCacheContext cacheCtx = e.cached().context(); if (cacheCtx.isNear()) continue; @@ -470,9 +470,9 @@ private void addMapping( checkInternal(e.txKey()); - GridCacheContext cacheCtx = e.context(); + GridCacheContext cacheCtx = e.context(); - GridDhtCacheAdapter dhtCache = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht(); + GridDhtCacheAdapter dhtCache = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht(); try { IgniteTxEntry existing = entry(e.txKey()); @@ -575,7 +575,7 @@ IgniteInternalFuture lockAllAsync( try { AffinityTopologyVersion topVer = topologyVersion(); - GridDhtCacheAdapter dhtCache = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht(); + GridDhtCacheAdapter dhtCache = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht(); // Enlist locks into transaction. for (int i = 0; i < entries.size(); i++) { @@ -911,7 +911,7 @@ else if (updateLockFuture(fut, ROLLBACK_FUT)) /** * @param prepFut Prepare future. - * @return If transaction if finished on prepare step returns future which is completed after transaction finish. + * @return If transaction is finished on prepare step returns future which is completed after transaction finish. */ protected final IgniteInternalFuture chainOnePhasePrepare( final GridDhtTxPrepareFuture prepFut) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index c5555d97ac930..ca5b56bd7bbd6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -97,10 +97,7 @@ import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridClosureException; import org.apache.ignite.internal.util.lang.GridInClosure3; -import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.typedef.C1; -import org.apache.ignite.internal.util.typedef.C2; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CX1; import org.apache.ignite.internal.util.typedef.F; @@ -110,7 +107,6 @@ import org.apache.ignite.lang.IgniteBiClosure; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteClosure; -import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.security.SecurityPermission; import org.apache.ignite.transactions.TransactionConcurrency; @@ -191,13 +187,13 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou /** * Counts how much time this transaction has spent on system calls, in nanoseconds. */ - private final AtomicLong systemTime = new AtomicLong(0); + private final AtomicLong sysTime = new AtomicLong(0); /** * Stores the nano time value when current system time has started, or 0 if no system section * is running currently. */ - private final AtomicLong systemStartTime = new AtomicLong(0); + private final AtomicLong sysStartTime = new AtomicLong(0); /** * Stores the nano time value when prepare step has started, or 0 if no prepare step @@ -492,20 +488,20 @@ public final IgniteInternalFuture putAsync( /** * @param cacheCtx Cache context. * @param key Key. - * @param entryProcessor Entry processor. + * @param entryProc Entry processor. * @param invokeArgs Optional arguments for entry processor. * @return Operation future. */ public IgniteInternalFuture invokeAsync(GridCacheContext cacheCtx, @Nullable AffinityTopologyVersion entryTopVer, K key, - EntryProcessor entryProcessor, + EntryProcessor entryProc, Object... invokeArgs) { return (IgniteInternalFuture)putAsync0(cacheCtx, entryTopVer, key, null, - entryProcessor, + entryProc, invokeArgs, true, null); @@ -541,11 +537,7 @@ public IgniteInternalFuture putAllDrAsync( GridCacheContext cacheCtx, Map drMap ) { - Map map = F.viewReadOnly(drMap, new IgniteClosure() { - @Override public Object apply(GridCacheDrInfo val) { - return val.value(); - } - }); + Map map = F.viewReadOnly(drMap, (IgniteClosure)GridCacheDrInfo::value); return this.putAllAsync0(cacheCtx, null, @@ -593,7 +585,7 @@ public IgniteInternalFuture removeAllAsync( * @param cacheCtx Cache context. * @param key Key. * @param val Value. - * @param entryProcessor Entry processor. + * @param entryProc Entry processor. * @param invokeArgs Optional arguments for EntryProcessor. * @param retval Return value flag. * @param filter Filter. @@ -604,7 +596,7 @@ private IgniteInternalFuture putAsync0( @Nullable AffinityTopologyVersion entryTopVer, K key, @Nullable V val, - @Nullable EntryProcessor entryProcessor, + @Nullable EntryProcessor entryProc, @Nullable final Object[] invokeArgs, final boolean retval, @Nullable final CacheEntryPredicate filter @@ -613,7 +605,7 @@ private IgniteInternalFuture putAsync0( if (cacheCtx.mvccEnabled()) return mvccPutAllAsync0(cacheCtx, Collections.singletonMap(key, val), - entryProcessor == null ? null : Collections.singletonMap(key, entryProcessor), invokeArgs, retval, filter); + entryProc == null ? null : Collections.singletonMap(key, entryProc), invokeArgs, retval, filter); try { beforePut(cacheCtx, retval, false); @@ -636,7 +628,7 @@ private IgniteInternalFuture putAsync0( cacheKey, val, opCtx != null ? opCtx.expiry() : null, - entryProcessor, + entryProc, invokeArgs, retval, filters, @@ -670,7 +662,7 @@ private IgniteInternalFuture putAsync0( IgniteInternalFuture fut = cacheCtx.cache().txLockAsync(enlisted, timeout, this, - /*read*/entryProcessor != null, // Needed to force load from store. + /*read*/entryProc != null, // Needed to force load from store. retval, isolation, isInvalidate(), @@ -799,9 +791,9 @@ private IgniteInternalFuture mvccPutAllAsync0( } Object val = map == null ? null : map.get(key); - EntryProcessor entryProcessor = transform ? invokeMap.get(key) : null; + EntryProcessor entryProc = transform ? invokeMap.get(key) : null; - if (val == null && entryProcessor == null) { + if (val == null && entryProc == null) { setRollbackOnly(); throw new NullPointerException("Null value."); @@ -810,7 +802,7 @@ private IgniteInternalFuture mvccPutAllAsync0( KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key); if (transform) - enlisted.put(cacheKey, new GridInvokeValue(entryProcessor, invokeArgs)); + enlisted.put(cacheKey, new GridInvokeValue(entryProc, invokeArgs)); else enlisted.put(cacheKey, val); } @@ -1025,7 +1017,7 @@ private IgniteInternalFuture putAllAsync0( * @param cacheKey Key to enlist. * @param val Value. * @param expiryPlc Explicitly specified expiry policy for entry. - * @param entryProcessor Entry processor (for invoke operation). + * @param entryProc Entry processor (for invoke operation). * @param invokeArgs Optional arguments for EntryProcessor. * @param retval Flag indicating whether a value should be returned. * @param filter User filters. @@ -1041,7 +1033,7 @@ private IgniteInternalFuture enlistWrite( KeyCacheObject cacheKey, Object val, @Nullable ExpiryPolicy expiryPlc, - @Nullable EntryProcessor entryProcessor, + @Nullable EntryProcessor entryProc, @Nullable Object[] invokeArgs, final boolean retval, final CacheEntryPredicate[] filter, @@ -1064,7 +1056,7 @@ private IgniteInternalFuture enlistWrite( final boolean needVal = retval || hasFilters; final boolean needReadVer = needVal && (serializable() && optimistic()); - if (entryProcessor != null) + if (entryProc != null) transform = true; GridCacheVersion drVer = dataCenterId != null ? cacheCtx.cache().nextVersion(dataCenterId) : null; @@ -1073,7 +1065,7 @@ private IgniteInternalFuture enlistWrite( entryTopVer, cacheKey, val, - entryProcessor, + entryProc, invokeArgs, expiryPlc, retval, @@ -1104,22 +1096,20 @@ private IgniteInternalFuture enlistWrite( needReadVer, false, hasFilters, - /*read through*/(entryProcessor != null || cacheCtx.config().isLoadPreviousValue()) && !skipStore, + /*read through*/(entryProc != null || cacheCtx.config().isLoadPreviousValue()) && !skipStore, retval, keepBinary, recovery, expiryPlc); - loadFut.listen(new IgniteInClosure>() { - @Override public void apply(IgniteInternalFuture fut) { - try { - fut.get(); + loadFut.listen((IgniteInternalFuture fut) -> { + try { + fut.get(); - finishFuture(enlistFut, null, true); - } - catch (IgniteCheckedException e) { - finishFuture(enlistFut, e, true); - } + finishFuture(enlistFut, null, true); + } + catch (IgniteCheckedException e) { + finishFuture(enlistFut, e, true); } }); @@ -1218,7 +1208,7 @@ private IgniteInternalFuture enlistWrite( } Object val = rmv || lookup == null ? null : lookup.get(key); - EntryProcessor entryProcessor = invokeMap == null ? null : invokeMap.get(key); + EntryProcessor entryProc = invokeMap == null ? null : invokeMap.get(key); GridCacheVersion drVer; long drTtl; @@ -1251,7 +1241,7 @@ else if (dataCenterId != null) { drExpireTime = -1L; } - if (!rmv && val == null && entryProcessor == null) { + if (!rmv && val == null && entryProc == null) { setRollbackOnly(); throw new NullPointerException("Null value."); @@ -1263,7 +1253,7 @@ else if (dataCenterId != null) { entryTopVer, cacheKey, val, - entryProcessor, + entryProc, invokeArgs, expiryPlc, retval, @@ -1308,16 +1298,14 @@ else if (dataCenterId != null) { recovery, expiryPlc); - loadFut.listen(new IgniteInClosure>() { - @Override public void apply(IgniteInternalFuture fut) { - try { - fut.get(); + loadFut.listen((IgniteInternalFuture fut) -> { + try { + fut.get(); - finishFuture(enlistFut, null, true); - } - catch (IgniteCheckedException e) { - finishFuture(enlistFut, e, true); - } + finishFuture(enlistFut, null, true); + } + catch (IgniteCheckedException e) { + finishFuture(enlistFut, e, true); } }); @@ -1336,7 +1324,7 @@ else if (dataCenterId != null) { * @param cacheCtx Cache context. * @param cacheKey Key. * @param val Value. - * @param entryProcessor Entry processor. + * @param entryProc Entry processor. * @param invokeArgs Optional arguments for EntryProcessor. * @param expiryPlc Explicitly specified expiry policy for entry. * @param retval Return value flag. @@ -1358,7 +1346,7 @@ private boolean enlistWriteEntry(GridCacheContext cacheCtx, @Nullable AffinityTopologyVersion entryTopVer, final KeyCacheObject cacheKey, @Nullable final Object val, - @Nullable final EntryProcessor entryProcessor, + @Nullable final EntryProcessor entryProc, @Nullable final Object[] invokeArgs, @Nullable final ExpiryPolicy expiryPlc, final boolean retval, @@ -1377,7 +1365,7 @@ private boolean enlistWriteEntry(GridCacheContext cacheCtx, ) throws IgniteCheckedException { boolean loadMissed = false; - final boolean rmv = val == null && entryProcessor == null; + final boolean rmv = val == null && entryProc == null; IgniteTxKey txKey = cacheCtx.txKey(cacheKey); @@ -1417,7 +1405,7 @@ private boolean enlistWriteEntry(GridCacheContext cacheCtx, this, /*metrics*/retval, /*events*/retval, - entryProcessor, + entryProc, resolveTaskName(), null, keepBinary, @@ -1443,7 +1431,7 @@ private boolean enlistWriteEntry(GridCacheContext cacheCtx, /*read through*/false, /*metrics*/retval, /*events*/retval, - entryProcessor, + entryProc, resolveTaskName(), null, keepBinary); @@ -1463,7 +1451,7 @@ private boolean enlistWriteEntry(GridCacheContext cacheCtx, old = entry.rawGet(); final GridCacheOperation op = rmv ? DELETE : - entryProcessor != null ? TRANSFORM : old != null ? UPDATE : CREATE; + entryProc != null ? TRANSFORM : old != null ? UPDATE : CREATE; if (old != null && hasFilters && !filter(entry.context(), cacheKey, old, filter)) { ret.set( @@ -1478,7 +1466,7 @@ private boolean enlistWriteEntry(GridCacheContext cacheCtx, if (optimistic() && serializable()) { txEntry = addEntry(op, old, - entryProcessor, + entryProc, invokeArgs, entry, expiryPlc, @@ -1530,7 +1518,7 @@ private boolean enlistWriteEntry(GridCacheContext cacheCtx, txEntry = addEntry(op, cVal, - entryProcessor, + entryProc, invokeArgs, entry, expiryPlc, @@ -1622,7 +1610,7 @@ private boolean enlistWriteEntry(GridCacheContext cacheCtx, } } else { - if (entryProcessor == null && txEntry.op() == TRANSFORM) + if (entryProc == null && txEntry.op() == TRANSFORM) throw new IgniteCheckedException("Failed to enlist write value for key (cannot have update value in " + "transaction after EntryProcessor is applied): " + CU.value(cacheKey, cacheCtx, false)); @@ -1639,7 +1627,7 @@ private boolean enlistWriteEntry(GridCacheContext cacheCtx, return loadMissed; } - GridCacheOperation op = rmv ? DELETE : entryProcessor != null ? TRANSFORM : + GridCacheOperation op = rmv ? DELETE : entryProc != null ? TRANSFORM : v != null ? UPDATE : CREATE; CacheObject cVal = cacheCtx.toCacheObject(val); @@ -1649,7 +1637,7 @@ private boolean enlistWriteEntry(GridCacheContext cacheCtx, txEntry = addEntry(op, cVal, - entryProcessor, + entryProc, invokeArgs, entry, expiryPlc, @@ -2755,7 +2743,7 @@ private Collection enlistRead( // First time access within transaction. else { if (lockKeys == null && !skipVals) - lockKeys = single ? Collections.singleton(key) : new ArrayList(keysCnt); + lockKeys = single ? Collections.singleton(key) : new ArrayList<>(keysCnt); if (!single && !skipVals) lockKeys.add(key); @@ -2911,74 +2899,70 @@ private IgniteInternalFuture loadMissing( final boolean recovery, final ExpiryPolicy expiryPlc) { GridInClosure3 c = - new GridInClosure3() { - @Override public void apply(KeyCacheObject key, - @Nullable Object val, - @Nullable GridCacheVersion loadVer) { - if (log.isDebugEnabled()) - log.debug("Loaded value from remote node [key=" + key + ", val=" + val + ']'); + (KeyCacheObject key, @Nullable Object val, @Nullable GridCacheVersion loadVer) -> { + if (log.isDebugEnabled()) + log.debug("Loaded value from remote node [key=" + key + ", val=" + val + ']'); - IgniteTxEntry e = entry(new IgniteTxKey(key, cacheCtx.cacheId())); + IgniteTxEntry e = entry(new IgniteTxKey(key, cacheCtx.cacheId())); - assert e != null; + assert e != null; - if (needReadVer) { - assert loadVer != null; + if (needReadVer) { + assert loadVer != null; - e.entryReadVersion(singleRmv && val != null ? SER_READ_NOT_EMPTY_VER : loadVer); - } + e.entryReadVersion(singleRmv && val != null ? SER_READ_NOT_EMPTY_VER : loadVer); + } - if (singleRmv) { - assert !hasFilters && !retval; - assert val == null || Boolean.TRUE.equals(val) : val; + if (singleRmv) { + assert !hasFilters && !retval; + assert val == null || Boolean.TRUE.equals(val) : val; - ret.set(cacheCtx, null, val != null, keepBinary, U.deploymentClassLoader(cctx.kernalContext(), deploymentLdrId)); - } - else { - CacheObject cacheVal = cacheCtx.toCacheObject(val); + ret.set(cacheCtx, null, val != null, keepBinary, U.deploymentClassLoader(cctx.kernalContext(), deploymentLdrId)); + } + else { + CacheObject cacheVal = cacheCtx.toCacheObject(val); - if (e.op() == TRANSFORM) { - GridCacheVersion ver; + if (e.op() == TRANSFORM) { + GridCacheVersion ver; - e.readValue(cacheVal); + e.readValue(cacheVal); - try { - ver = e.cached().version(); - } - catch (GridCacheEntryRemovedException ex) { - assert optimistic() : e; - - if (log.isDebugEnabled()) - log.debug("Failed to get entry version: [msg=" + ex.getMessage() + ']'); + try { + ver = e.cached().version(); + } + catch (GridCacheEntryRemovedException ex) { + assert optimistic() : e; - ver = null; - } + if (log.isDebugEnabled()) + log.debug("Failed to get entry version: [msg=" + ex.getMessage() + ']'); - addInvokeResult(e, cacheVal, ret, ver); + ver = null; } - else { - boolean success; - if (hasFilters) { - success = isAll(e.context(), key, cacheVal, filter); + addInvokeResult(e, cacheVal, ret, ver); + } + else { + boolean success; - if (!success) { - e.value(cacheVal, false, false); + if (hasFilters) { + success = isAll(e.context(), key, cacheVal, filter); - e.op(READ); - } - } - else - success = true; + if (!success) { + e.value(cacheVal, false, false); - ret.set( - cacheCtx, - cacheVal, - success, - keepBinary, - U.deploymentClassLoader(cctx.kernalContext(), deploymentLdrId) - ); + e.op(READ); + } } + else + success = true; + + ret.set( + cacheCtx, + cacheVal, + success, + keepBinary, + U.deploymentClassLoader(cctx.kernalContext(), deploymentLdrId) + ); } } }; @@ -3098,7 +3082,7 @@ private void onException() { * @return Future with {@code True} value if loading took place. */ private IgniteInternalFuture loadMissing( - final GridCacheContext cacheCtx, + final GridCacheContext cacheCtx, AffinityTopologyVersion topVer, boolean readThrough, final Collection keys, @@ -3116,28 +3100,27 @@ private IgniteInternalFuture loadMissing( if (cacheCtx.isNear()) { return cacheCtx.nearTx().txLoadAsync(this, - topVer, - keys, - readThrough, - needVer || !cacheCtx.config().isReadFromBackup() || (optimistic() && serializable() && readThrough), - /*deserializeBinary*/false, - recovery, - expiryPlc0, - skipVals, - needVer).chain(new C1>, Void>() { - @Override public Void apply(IgniteInternalFuture> f) { - try { - Map map = f.get(); + topVer, + keys, + readThrough, + needVer || !cacheCtx.config().isReadFromBackup() || (optimistic() && serializable() && readThrough), + /*deserializeBinary*/false, + recovery, + expiryPlc0, + skipVals, + needVer) + .chain((IgniteInternalFuture> f) -> { + try { + Map map = f.get(); - processLoaded(map, keys, needVer, c); + processLoaded(map, keys, needVer, c); - return null; - } - catch (Exception e) { - setRollbackOnly(); + return null; + } + catch (Exception e) { + setRollbackOnly(); - throw new GridClosureException(e); - } + throw new GridClosureException(e); } }); } @@ -3198,20 +3181,18 @@ private IgniteInternalFuture loadMissing( recovery, null, label() - ).chain(new C1, Void>() { - @Override public Void apply(IgniteInternalFuture f) { - try { - Object val = f.get(); + ).chain((IgniteInternalFuture f) -> { + try { + Object val = f.get(); - processLoaded(key, val, needVer, skipVals, c); + processLoaded(key, val, needVer, skipVals, c); - return null; - } - catch (Exception e) { - setRollbackOnly(); + return null; + } + catch (Exception e) { + setRollbackOnly(); - throw new GridClosureException(e); - } + throw new GridClosureException(e); } }); } @@ -3230,20 +3211,18 @@ private IgniteInternalFuture loadMissing( /*keepCacheObject*/true, label(), null - ).chain(new C1>, Void>() { - @Override public Void apply(IgniteInternalFuture> f) { - try { - Map map = f.get(); + ).chain((IgniteInternalFuture> f) -> { + try { + Map map = f.get(); - processLoaded(map, keys, needVer, c); + processLoaded(map, keys, needVer, c); - return null; - } - catch (Exception e) { - setRollbackOnly(); + return null; + } + catch (Exception e) { + setRollbackOnly(); - throw new GridClosureException(e); - } + throw new GridClosureException(e); } }); } @@ -3377,7 +3356,7 @@ public IgniteInternalFuture requestSnapshot() { MvccCoordinator crd = prc.currentCoordinator(); synchronized (this) { - this.crdVer = crd.version(); + crdVer = crd.version(); } if (crd.local()) @@ -3633,7 +3612,7 @@ private void readyNearLock(IgniteTxEntry txEntry, Collection rolledbackVers ) { while (true) { - GridCacheContext cacheCtx = txEntry.cached().context(); + GridCacheContext cacheCtx = txEntry.cached().context(); assert cacheCtx.isNear(); @@ -3756,13 +3735,13 @@ private void readyNearLock(IgniteTxEntry txEntry, * @return Amount of time in milliseconds. */ public long systemTimeCurrent() { - long systemTime0 = systemTime.get(); + long sysTime0 = sysTime.get(); - long systemStartTime0 = systemStartTime.get(); + long sysStartTime0 = sysStartTime.get(); - long t = systemStartTime0 == 0 ? 0 : (System.nanoTime() - systemStartTime0); + long t = sysStartTime0 == 0 ? 0 : (System.nanoTime() - sysStartTime0); - return U.nanosToMillis(systemTime0 + t); + return U.nanosToMillis(sysTime0 + t); } /** {@inheritDoc} */ @@ -3776,13 +3755,13 @@ public long systemTimeCurrent() { if (!commitOrRollbackTime.compareAndSet(0, System.nanoTime() - commitOrRollbackStartTime.get())) return res; - long systemTimeMillis = U.nanosToMillis(this.systemTime.get()); + long sysTimeMillis = U.nanosToMillis(sysTime.get()); long totalTimeMillis = System.currentTimeMillis() - startTime(); // In some cases totalTimeMillis can be less than systemTimeMillis, as they are calculated with different precision. - long userTimeMillis = Math.max(totalTimeMillis - systemTimeMillis, 0); + long userTimeMillis = Math.max(totalTimeMillis - sysTimeMillis, 0); - cctx.txMetrics().onNearTxComplete(systemTimeMillis, userTimeMillis); + cctx.txMetrics().onNearTxComplete(sysTimeMillis, userTimeMillis); boolean willBeSkipped = txDumpsThrottling == null || txDumpsThrottling.skipCurrent(); @@ -3797,7 +3776,7 @@ public long systemTimeCurrent() { && ThreadLocalRandom.current().nextDouble() <= transactionTimeDumpSamplesCoefficient; if (randomlyChosen || isLong) { - String txDump = completedTransactionDump(state, systemTimeMillis, userTimeMillis, isLong); + String txDump = completedTransactionDump(state, sysTimeMillis, userTimeMillis, isLong); if (isLong) log.warning(txDump); @@ -3818,27 +3797,27 @@ else if (txDumpsThrottling != null) * Builds dump string for completed transaction. * * @param state Transaction state. - * @param systemTimeMillis System time in milliseconds. + * @param sysTimeMillis System time in milliseconds. * @param userTimeMillis User time in milliseconds. * @param isLong Whether the dumped transaction is long running or not. * @return Dump string. */ private String completedTransactionDump( TransactionState state, - long systemTimeMillis, + long sysTimeMillis, long userTimeMillis, boolean isLong ) { long cacheOperationsTimeMillis = - U.nanosToMillis(systemTime.get() - prepareTime.get() - commitOrRollbackTime.get()); + U.nanosToMillis(sysTime.get() - prepareTime.get() - commitOrRollbackTime.get()); GridStringBuilder warning = new GridStringBuilder(isLong ? "Long transaction time dump " : "Transaction time dump ") .a("[startTime=") .a(IgniteUtils.DEBUG_DATE_FMT.format(Instant.ofEpochMilli(startTime))) .a(", totalTime=") - .a(systemTimeMillis + userTimeMillis) + .a(sysTimeMillis + userTimeMillis) .a(", systemTime=") - .a(systemTimeMillis) + .a(sysTimeMillis) .a(", userTime=") .a(userTimeMillis) .a(", cacheOperationsTime=") @@ -3891,13 +3870,8 @@ public IgniteInternalFuture prepareNearTxLocal() { if (!PREP_FUT_UPD.compareAndSet(this, null, fut)) return prepFut; - if (trackTimeout) { - prepFut.listen(new IgniteInClosure>() { - @Override public void apply(IgniteInternalFuture f) { - GridNearTxLocal.this.removeTimeoutHandler(); - } - }); - } + if (trackTimeout) + prepFut.listen((IgniteInternalFuture f) -> removeTimeoutHandler()); if (timeout == -1) { fut.onDone(this, timeoutException()); @@ -3927,11 +3901,11 @@ public IgniteInternalFuture prepareNearTxLocal() { } /** - * @param awaitLastFuture If true - method will wait until transaction finish every action started before. + * @param awaitLastFut If true - method will wait until transaction finish every action started before. * @throws IgniteCheckedException If failed. */ - public final void prepare(boolean awaitLastFuture) throws IgniteCheckedException { - if (awaitLastFuture) + public final void prepare(boolean awaitLastFut) throws IgniteCheckedException { + if (awaitLastFut) txState().awaitLastFuture(cctx); prepareNearTxLocal().get(); @@ -3970,40 +3944,38 @@ public IgniteInternalFuture commitNearTxLocalAsync() { // Properly finish prepFut in case of unchecked error. assert prepareFut != null; // Prep future must be set. - ((GridNearTxPrepareFutureAdapter)prepFut).onDone(t); + ((GridFutureAdapter)prepFut).onDone(t); } - prepareFut.listen(new CI1>() { - @Override public void apply(IgniteInternalFuture f) { - // These values should not be changed after set once. - prepareTime.compareAndSet(0, System.nanoTime() - prepareStartTime.get()); + prepareFut.listen((IgniteInternalFuture f) -> { + // These values should not be changed after set once. + prepareTime.compareAndSet(0, System.nanoTime() - prepareStartTime.get()); - commitOrRollbackStartTime.compareAndSet(0, System.nanoTime()); + commitOrRollbackStartTime.compareAndSet(0, System.nanoTime()); - if (!onePhaseCommit) - incrementalSnapshotId(cctx.snapshotMgr().incrementalSnapshotId()); + if (!onePhaseCommit) + incrementalSnapshotId(cctx.snapshotMgr().incrementalSnapshotId()); - try { - // Make sure that here are no exceptions. - f.get(); + try { + // Make sure that here are no exceptions. + f.get(); - fut.finish(true, true, false); - } - catch (Error | RuntimeException e) { - COMMIT_ERR_UPD.compareAndSet(GridNearTxLocal.this, null, e); + fut.finish(true, true, false); + } + catch (Error | RuntimeException e) { + COMMIT_ERR_UPD.compareAndSet(GridNearTxLocal.this, null, e); - fut.finish(false, true, false); + fut.finish(false, true, false); - throw e; - } - catch (IgniteCheckedException e) { - COMMIT_ERR_UPD.compareAndSet(GridNearTxLocal.this, null, e); + throw e; + } + catch (IgniteCheckedException e) { + COMMIT_ERR_UPD.compareAndSet(GridNearTxLocal.this, null, e); - if (!(e instanceof NodeStoppingException)) - fut.finish(false, true, true); - else - fut.onNodeStop(e); - } + if (!(e instanceof NodeStoppingException)) + fut.finish(false, true, true); + else + fut.onNodeStop(e); } }); } @@ -4141,41 +4113,37 @@ private IgniteInternalFuture chainFinishFuture(final NearTxFin if (!commit) { final GridNearTxFinishFuture rollbackFut = new GridNearTxFinishFuture<>(cctx, this, false); - fut.listen(new IgniteInClosure>() { - @Override public void apply(IgniteInternalFuture fut0) { - if (FINISH_FUT_UPD.compareAndSet(tx, fut, rollbackFut)) { - switch (tx.state()) { - case COMMITTED: - if (log.isDebugEnabled()) - log.debug("Failed to rollback, transaction is already committed: " + tx); + fut.listen((IgniteInternalFuture fut0) -> { + if (FINISH_FUT_UPD.compareAndSet(tx, fut, rollbackFut)) { + switch (tx.state()) { + case COMMITTED: + if (log.isDebugEnabled()) + log.debug("Failed to rollback, transaction is already committed: " + tx); - // Fall-through. + // Fall-through. - case ROLLED_BACK: - rollbackFut.forceFinish(); + case ROLLED_BACK: + rollbackFut.forceFinish(); - assert rollbackFut.isDone() : rollbackFut; + assert rollbackFut.isDone() : rollbackFut; - break; + break; - default: // First finish attempt was unsuccessful. Try again. - rollbackFut.finish(false, clearThreadMap, onTimeout); - } + default: // First finish attempt was unsuccessful. Try again. + rollbackFut.finish(false, clearThreadMap, onTimeout); } - else { - finishFut.listen(new IgniteInClosure>() { - @Override public void apply(IgniteInternalFuture fut) { - try { - fut.get(); + } + else { + finishFut.listen((IgniteInternalFuture f) -> { + try { + f.get(); - rollbackFut.markInitialized(); - } - catch (IgniteCheckedException e) { - rollbackFut.onDone(e); - } - } - }); - } + rollbackFut.markInitialized(); + } + catch (IgniteCheckedException e) { + rollbackFut.onDone(e); + } + }); } }); @@ -4184,15 +4152,13 @@ private IgniteInternalFuture chainFinishFuture(final NearTxFin else { final GridFutureAdapter fut0 = new GridFutureAdapter<>(); - fut.listen(new IgniteInClosure>() { - @Override public void apply(IgniteInternalFuture fut) { - if (timedOut()) - fut0.onDone(new IgniteTxTimeoutCheckedException("Failed to commit transaction, " + - "transaction is concurrently rolled back on timeout: " + tx)); - else - fut0.onDone(new IgniteTxRollbackCheckedException("Failed to commit transaction, " + - "transaction is concurrently rolled back: " + tx)); - } + fut.listen((IgniteInternalFuture f) -> { + if (timedOut()) + fut0.onDone(new IgniteTxTimeoutCheckedException("Failed to commit transaction, " + + "transaction is concurrently rolled back on timeout: " + tx)); + else + fut0.onDone(new IgniteTxRollbackCheckedException("Failed to commit transaction, " + + "transaction is concurrently rolled back: " + tx)); }); return fut0; @@ -4462,39 +4428,6 @@ public IgniteInternalFuture lockAllAsync(GridCacheContext c ); } - /** - * Gets cache entry for given key. - * - * @param cacheCtx Cache context. - * @param key Key. - * @return Cache entry. - */ - protected GridCacheEntryEx entryEx(GridCacheContext cacheCtx, IgniteTxKey key) { - if (cacheCtx.isColocated()) { - IgniteTxEntry txEntry = entry(key); - - if (txEntry == null) - return cacheCtx.colocated().entryExx(key.key(), topologyVersion(), true); - - GridCacheEntryEx cached = txEntry.cached(); - - assert cached != null; - - if (cached.detached()) - return cached; - - if (cached.obsoleteVersion() != null) { - cached = cacheCtx.colocated().entryExx(key.key(), topologyVersion(), true); - - txEntry.cached(cached); - } - - return cached; - } - else - return cacheCtx.cache().entryEx(key.key()); - } - /** {@inheritDoc} */ @Override protected GridCacheEntryEx entryEx( GridCacheContext cacheCtx, @@ -4607,7 +4540,7 @@ public void close(boolean clearThreadMap) throws IgniteCheckedException { for (Map.Entry e : accessMap.entrySet()) { if (e.getValue().entries() != null) { - GridCacheContext cctx0 = cctx.cacheContext(e.getKey().cacheId()); + GridCacheContext cctx0 = cctx.cacheContext(e.getKey().cacheId()); if (cctx0.isNear()) cctx0.near().dht().sendTtlUpdateRequest(e.getValue()); @@ -4729,24 +4662,21 @@ private IgniteInternalFuture> checkMissed( final boolean needReadVer = (serializable() && optimistic()) || needVer; - return new GridEmbeddedFuture<>( - new C2>() { - @Override public Map apply(Void v, Exception e) { - if (e != null) - throw new GridClosureException(e); + return new GridEmbeddedFuture<>((Void v, Exception e) -> { + if (e != null) + throw new GridClosureException(e); - if (isRollbackOnly()) { - if (timedOut()) - throw new GridClosureException(new IgniteTxTimeoutCheckedException( - "Transaction has been timed out: " + GridNearTxLocal.this)); - else - throw new GridClosureException(new IgniteTxRollbackCheckedException( - "Transaction has been rolled back: " + GridNearTxLocal.this)); - } + if (isRollbackOnly()) { + if (timedOut()) + throw new GridClosureException(new IgniteTxTimeoutCheckedException( + "Transaction has been timed out: " + GridNearTxLocal.this)); + else + throw new GridClosureException(new IgniteTxRollbackCheckedException( + "Transaction has been rolled back: " + GridNearTxLocal.this)); + } - return map; - } - }, + return map; + }, loadMissing( cacheCtx, topVer, @@ -4758,69 +4688,67 @@ private IgniteInternalFuture> checkMissed( recovery, readRepairStrategy, expiryPlc, - new GridInClosure3() { - @Override public void apply(KeyCacheObject key, Object val, GridCacheVersion loadVer) { - CacheObject cacheVal = cacheCtx.toCacheObject(val); + (KeyCacheObject key, Object val, GridCacheVersion loadVer) -> { + CacheObject cacheVal = cacheCtx.toCacheObject(val); - CacheObject visibleVal = cacheVal; + CacheObject visibleVal = cacheVal; - IgniteTxKey txKey = cacheCtx.txKey(key); + IgniteTxKey txKey = cacheCtx.txKey(key); - IgniteTxEntry txEntry = entry(txKey); + IgniteTxEntry txEntry = entry(txKey); - if (txEntry != null) { - if (!readCommitted()) - txEntry.readValue(cacheVal); + if (txEntry != null) { + if (!readCommitted()) + txEntry.readValue(cacheVal); - if (!F.isEmpty(txEntry.entryProcessors())) - visibleVal = txEntry.applyEntryProcessors(visibleVal); - } + if (!F.isEmpty(txEntry.entryProcessors())) + visibleVal = txEntry.applyEntryProcessors(visibleVal); + } - assert txEntry != null || readCommitted() || skipVals; + assert txEntry != null || readCommitted() || skipVals; - GridCacheEntryEx e = txEntry == null ? entryEx(cacheCtx, txKey, topVer) : txEntry.cached(); + GridCacheEntryEx e = txEntry == null ? entryEx(cacheCtx, txKey, topVer) : txEntry.cached(); - if (readCommitted() || skipVals) { - e.touch(); + if (readCommitted() || skipVals) { + e.touch(); - if (visibleVal != null) { - cacheCtx.addResult(map, - key, - visibleVal, - skipVals, - keepCacheObjects, - deserializeBinary, - false, - needVer ? loadVer : null, - 0, - 0, - U.deploymentClassLoader(cctx.kernalContext(), deploymentLdrId)); - } + if (visibleVal != null) { + cacheCtx.addResult(map, + key, + visibleVal, + skipVals, + keepCacheObjects, + deserializeBinary, + false, + needVer ? loadVer : null, + 0, + 0, + U.deploymentClassLoader(cctx.kernalContext(), deploymentLdrId)); } - else { - assert txEntry != null; + } + else { + assert txEntry != null; - txEntry.setAndMarkValid(cacheVal); + txEntry.setAndMarkValid(cacheVal); - if (needReadVer) { - assert loadVer != null; + if (needReadVer) { + assert loadVer != null; - txEntry.entryReadVersion(loadVer); - } + txEntry.entryReadVersion(loadVer); + } - if (visibleVal != null) { - cacheCtx.addResult(map, - key, - visibleVal, - skipVals, - keepCacheObjects, - deserializeBinary, - false, - needVer ? loadVer : null, - 0, - 0, - U.deploymentClassLoader(cctx.kernalContext(), deploymentLdrId)); - } + if (visibleVal != null) { + cacheCtx.addResult(map, + key, + visibleVal, + skipVals, + keepCacheObjects, + deserializeBinary, + false, + needVer ? loadVer : null, + 0, + 0, + U.deploymentClassLoader(cctx.kernalContext(), deploymentLdrId)); } } }) @@ -4988,15 +4916,13 @@ private boolean removeTimeoutHandler() { } if (proceed || (state() == MARKED_ROLLBACK)) { - cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() { - @Override public void run() { - // Note: if rollback asynchronously on timeout should not clear thread map - // since thread started tx still should be able to see this tx. - rollbackNearTxLocalAsync(false, true); - - U.warn(log, "The transaction was forcibly rolled back because a timeout is reached: " + - CU.txString(GridNearTxLocal.this)); - } + cctx.kernalContext().closure().runLocalSafe(() -> { + // Note: if rollback asynchronously on timeout should not clear thread map + // since thread started tx still should be able to see this tx. + rollbackNearTxLocalAsync(false, true); + + U.warn(log, "The transaction was forcibly rolled back because a timeout is reached: " + + CU.txString(GridNearTxLocal.this)); }); } else { @@ -5016,17 +4942,17 @@ private long timeMillis(AtomicLong atomicNanoTime) { public void enterSystemSection() { // Setting systemStartTime only if it equals 0, otherwise it means that we are already in system section // and should do nothing. - systemStartTime.compareAndSet(0, System.nanoTime()); + sysStartTime.compareAndSet(0, System.nanoTime()); } /** * Leaves the section when system time for this transaction is counted. */ public void leaveSystemSection() { - long systemStartTime0 = systemStartTime.getAndSet(0); + long sysStartTime0 = sysStartTime.getAndSet(0); - if (systemStartTime0 > 0) - systemTime.addAndGet(System.nanoTime() - systemStartTime0); + if (sysStartTime0 > 0) + sysTime.addAndGet(System.nanoTime() - sysStartTime0); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java index 24ac8dc8675ca..adb13f44816fa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java @@ -351,13 +351,10 @@ private CacheStore cacheStoreWrapper(GridKernalContext ctx, "key", key, true, "val", val, true)); - if (convert) { + if (convert) val = convert(val); - return val; - } - else - return val; + return val; } return null; @@ -621,7 +618,7 @@ private void loadAllFromStore(@Nullable IgniteInternalTx tx, if (map.size() == 1) { Map.Entry> e = - ((Map>)map).entrySet().iterator().next(); + map.entrySet().iterator().next(); return put(tx, e.getKey(), e.getValue().get1(), e.getValue().get2()); } @@ -785,7 +782,7 @@ private void loadAllFromStore(@Nullable IgniteInternalTx tx, /** {@inheritDoc} */ @Override public final void sessionEnd(IgniteInternalTx tx, boolean commit, boolean last, - boolean storeSessionEnded) throws IgniteCheckedException { + boolean storeSesEnded) throws IgniteCheckedException { assert store != null; sessionInit0(tx, commit ? StoreOperation.COMMIT : StoreOperation.ROLLBACK, false); @@ -796,7 +793,7 @@ private void loadAllFromStore(@Nullable IgniteInternalTx tx, lsnr.onSessionEnd(locSes, commit); } - if (!sesHolder.get().ended(store) && !storeSessionEnded) + if (!sesHolder.get().ended(store) && !storeSesEnded) store.sessionEnd(commit); } catch (Throwable e) { @@ -909,7 +906,7 @@ private void notifyCacheStoreSessionListeners(SessionData ses, @Nullable StoreOp break; default: - assert false : "Unexpected operation: " + op.toString(); + assert false : "Unexpected operation: " + op; } } if (notifyLsnrs) { @@ -976,8 +973,7 @@ private static class SessionData { private Object attach; /** */ - private final Set started = - new GridSetWrapper<>(new IdentityHashMap()); + private final Set started = new GridSetWrapper<>(new IdentityHashMap<>()); /** * @param tx Current transaction. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 0bd7e0028b276..b4bb0463e26f5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -326,7 +326,7 @@ protected IgniteTxAdapter( this.txSize = txSize; this.subjId = subjId; this.taskNameHash = taskNameHash; - this.deploymentLdrId = U.contextDeploymentClassLoaderId(cctx.kernalContext()); + deploymentLdrId = U.contextDeploymentClassLoaderId(cctx.kernalContext()); nodeId = cctx.discovery().localNode().id(); @@ -384,7 +384,7 @@ protected IgniteTxAdapter( this.txSize = txSize; this.subjId = subjId; this.taskNameHash = taskNameHash; - this.deploymentLdrId = null; + deploymentLdrId = null; implicit = false; loc = false; @@ -706,13 +706,7 @@ public boolean remote() { if (invalidParts == null) invalidParts = new HashMap<>(); - Set parts = invalidParts.get(cacheId); - - if (parts == null) { - parts = new HashSet<>(); - - invalidParts.put(cacheId, parts); - } + Set parts = invalidParts.computeIfAbsent(cacheId, k -> new HashSet<>()); parts.add(part); @@ -875,7 +869,7 @@ public void logTxFinishErrorSafe(@Nullable IgniteLogger log, boolean commit, Thr /** {@inheritDoc} */ @Override public boolean ownsLockUnsafe(GridCacheEntryEx entry) { - GridCacheContext cacheCtx = entry.context(); + GridCacheContext cacheCtx = entry.context(); IgniteTxEntry txEntry = entry(entry.txKey()); @@ -903,7 +897,7 @@ public void logTxFinishErrorSafe(@Nullable IgniteLogger log, boolean commit, Thr state = MARKED_ROLLBACK; if (log.isDebugEnabled()) - log.debug("Changed transaction state [prev=" + prev + ", new=" + this.state + ", tx=" + this + ']'); + log.debug("Changed transaction state [prev=" + prev + ", new=" + state + ", tx=" + this + ']'); notifyAll(); } @@ -1277,6 +1271,9 @@ private void recordStateChangedEvent(TransactionState state) { break; } + + default: + break; } } @@ -1376,7 +1373,7 @@ protected boolean isWriteToStoreFromDhtValid(Collection store protected void sessionEnd(final Collection stores, boolean commit) throws IgniteCheckedException { Iterator it = stores.iterator(); - Set visited = new GridSetWrapper<>(new IdentityHashMap()); + Set visited = new GridSetWrapper<>(new IdentityHashMap<>()); while (it.hasNext()) { CacheStoreManager store = it.next(); @@ -1441,7 +1438,7 @@ protected final void batchStoreCommit(Iterable writeEntries) thro IgniteBiTuple res = applyTransformClosures(e, false, null); - GridCacheContext cacheCtx = e.context(); + GridCacheContext cacheCtx = e.context(); GridCacheOperation op = res.get1(); KeyCacheObject key = e.key(); @@ -1610,7 +1607,7 @@ protected IgniteBiTuple applyTransformClosures( @Nullable GridCacheReturn ret) throws GridCacheEntryRemovedException, IgniteCheckedException { assert txEntry.op() != TRANSFORM || !F.isEmpty(txEntry.entryProcessors()) : txEntry; - GridCacheContext cacheCtx = txEntry.context(); + GridCacheContext cacheCtx = txEntry.context(); assert cacheCtx != null; @@ -1688,9 +1685,9 @@ else if (txEntry.hasOldValue()) IgniteThread.onEntryProcessorEntered(true); try { - EntryProcessor processor = t.get1(); + EntryProcessor proc = t.get1(); - procRes = processor.process(invokeEntry, t.get2()); + procRes = proc.process(invokeEntry, t.get2()); val = invokeEntry.getValue(); @@ -1813,7 +1810,7 @@ else if (op == UPDATE) GridCacheVersionedEntryEx oldEntry = old.versionedEntry(txEntry.keepBinary()); // Construct new entry info. - GridCacheContext entryCtx = txEntry.context(); + GridCacheContext entryCtx = txEntry.context(); GridCacheVersionedEntryEx newEntry = new GridCacheLazyPlainVersionedEntry( entryCtx, @@ -1845,7 +1842,7 @@ else if (op == DELETE && resVal != null) * @return {@code True} if entry is locally mapped as a primary or back up node. */ protected boolean isNearLocallyMapped(IgniteTxEntry e, boolean primaryOnly) { - GridCacheContext cacheCtx = e.context(); + GridCacheContext cacheCtx = e.context(); if (!cacheCtx.isNear()) return false; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 1f3a3f02b8845..296823e393398 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -67,7 +67,6 @@ import org.apache.ignite.internal.util.lang.GridTuple; import org.apache.ignite.internal.util.tostring.GridToStringBuilder; import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.CX1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; @@ -889,9 +888,9 @@ assert ownsLock(txEntry.cached()) : boolean persistenceEnabled = CU.isPersistenceEnabled(cctx.kernalContext().config()); if (persistenceEnabled) { - GridCacheDatabaseSharedManager dbManager = (GridCacheDatabaseSharedManager)cctx.database(); + GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)cctx.database(); - dbManager.getCheckpointer().skipCheckpointOnNodeStop(true); + dbMgr.getCheckpointer().skipCheckpointOnNodeStop(true); } throw ex; @@ -1280,9 +1279,9 @@ protected final void addInvokeResult(IgniteTxEntry txEntry, CacheInvokeEntry invokeEntry = new CacheInvokeEntry<>(txEntry.key(), key0, cacheVal, val0, ver, txEntry.keepBinary(), txEntry.cached()); - EntryProcessor entryProcessor = t.get1(); + EntryProcessor entryProc = t.get1(); - res = entryProcessor.process(invokeEntry, t.get2()); + res = entryProc.process(invokeEntry, t.get2()); val0 = invokeEntry.getValue(txEntry.keepBinary()); @@ -1364,7 +1363,7 @@ protected void checkValid(boolean checkTimeout) throws IgniteCheckedException { * @param val Value. * @param expiryPlc Explicitly specified expiry policy. * @param invokeArgs Optional arguments for EntryProcessor. - * @param entryProcessor Entry processor. + * @param entryProc Entry processor. * @param entry Cache entry. * @param filter Filter. * @param filtersSet {@code True} if filter should be marked as set. @@ -1376,7 +1375,7 @@ protected void checkValid(boolean checkTimeout) throws IgniteCheckedException { */ public final IgniteTxEntry addEntry(GridCacheOperation op, @Nullable CacheObject val, - @Nullable EntryProcessor entryProcessor, + @Nullable EntryProcessor entryProc, Object[] invokeArgs, GridCacheEntryEx entry, @Nullable ExpiryPolicy expiryPlc, @@ -1407,12 +1406,12 @@ public final IgniteTxEntry addEntry(GridCacheOperation op, IgniteTxEntry txEntry; if (old != null) { - if (entryProcessor != null) { + if (entryProc != null) { assert val == null; assert op == TRANSFORM; // Will change the op. - old.addEntryProcessor(entryProcessor, invokeArgs); + old.addEntryProcessor(entryProc, invokeArgs); } else { assert old.op() != TRANSFORM; @@ -1450,7 +1449,7 @@ public final IgniteTxEntry addEntry(GridCacheOperation op, this, op, val, - EntryProcessorResourceInjectorProxy.wrap(cctx.kernalContext(), entryProcessor), + EntryProcessorResourceInjectorProxy.wrap(cctx.kernalContext(), entryProc), invokeArgs, hasDrTtl ? drTtl : -1L, entry, @@ -1711,10 +1710,8 @@ protected PostLockClosure1(T arg, boolean commit) { setRollbackOnly(); if (commit && commitAfterLock()) - return rollbackAsync().chain(new C1, T>() { - @Override public T apply(IgniteInternalFuture f) { - throw new GridClosureException(e); - } + return rollbackAsync().chain((IgniteInternalFuture f) -> { + throw new GridClosureException(e); }); throw new GridClosureException(e); @@ -1730,10 +1727,8 @@ protected PostLockClosure1(T arg, boolean commit) { ); if (commit && commitAfterLock()) - return rollbackAsync().chain(new C1, T>() { - @Override public T apply(IgniteInternalFuture f) { - throw ex; - } + return rollbackAsync().chain((IgniteInternalFuture f) -> { + throw ex; }); throw ex; @@ -1763,10 +1758,8 @@ protected PostLockClosure1(T arg, boolean commit) { } catch (final IgniteCheckedException ex) { if (commit && commitAfterLock()) - return rollbackAsync().chain(new C1, T>() { - @Override public T apply(IgniteInternalFuture f) { - throw new GridClosureException(ex); - } + return rollbackAsync().chain((IgniteInternalFuture f) -> { + throw new GridClosureException(ex); }); throw new GridClosureException(ex); @@ -1845,6 +1838,6 @@ private interface NearEntryUpdateClojure { * @throws IgniteCheckedException If operation is failed. * @throws GridCacheEntryRemovedException If entry is removed. */ - public void apply(E entry) throws IgniteCheckedException, GridCacheEntryRemovedException; + void apply(E entry) throws IgniteCheckedException, GridCacheEntryRemovedException; } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridMetadataAwareAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridMetadataAwareAdapter.java index 48fe4c5003201..c8e28753d2822 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridMetadataAwareAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridMetadataAwareAdapter.java @@ -21,6 +21,7 @@ import java.util.concurrent.Callable; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; /** @@ -36,13 +37,10 @@ public enum EntryKey { //keys sorted by usage rate, descending. CACHE_STORE_MANAGER_KEY(0), /** Predefined key. */ - CACHE_EVICTABLE_ENTRY_KEY(1), - - /** Predefined key. */ - CACHE_EVICTION_MANAGER_KEY(2); + CACHE_EVICTABLE_ENTRY_KEY(1); /** key. */ - private int key; + private final int key; /** * @param key key @@ -63,7 +61,7 @@ public int key() { /** Attributes. */ @GridToStringInclude(sensitive = true) - private Object[] data = null; + private Object[] data; /** * Copies all metadata from another instance. @@ -107,10 +105,10 @@ public void copyMeta(Object[] data) { assert val != null; synchronized (this) { - if (this.data == null) - this.data = new Object[key + 1]; - else if (this.data.length <= key) - this.data = Arrays.copyOf(this.data, key + 1); + if (data == null) + data = new Object[key + 1]; + else if (data.length <= key) + data = Arrays.copyOf(data, key + 1); V old = (V)data[key]; @@ -239,7 +237,7 @@ public boolean hasMeta(int key, V val) { assert val != null; synchronized (this) { - V v = (V)meta(key); + V v = meta(key); if (v == null) return addMeta(key, val); @@ -261,7 +259,7 @@ public V addMetaIfAbsent(int key, V val) { assert val != null; synchronized (this) { - V v = (V)meta(key); + V v = meta(key); if (v == null) addMeta(key, v = val); @@ -281,13 +279,11 @@ public V addMetaIfAbsent(int key, V val) { * @param Type of the value. * @return The value of the metadata after execution of this method. */ - @Nullable public V addMetaIfAbsent(int key, @Nullable Callable c) { - assert c != null; - + @Nullable public V addMetaIfAbsent(int key, @NotNull Callable c) { synchronized (this) { - V v = (V)meta(key); + V v = meta(key); - if (v == null && c != null) + if (v == null) try { addMeta(key, v = c.call()); }