diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java index b0f78bd329e57..10af241fe1121 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java @@ -25,10 +25,12 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.managers.deployment.GridDeployment; +import org.apache.ignite.internal.resources.MetricManagerResource; import org.apache.ignite.internal.util.GridLeanIdentitySet; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.typedef.F; @@ -48,7 +50,6 @@ import org.apache.ignite.resources.TaskContinuousMapperResource; import org.apache.ignite.resources.TaskSessionResource; import org.jetbrains.annotations.Nullable; -import java.util.concurrent.ConcurrentHashMap; /** * Resource container contains caches for classes used for injection. @@ -513,6 +514,9 @@ enum ResourceAnnotation { /** */ CACHE_STORE_SESSION(CacheStoreSessionResource.class), + /** */ + METRIC_MANAGER(MetricManagerResource.class), + /** */ FILESYSTEM_RESOURCE(FileSystemResource.class); @@ -537,7 +541,8 @@ public enum AnnotationSet { ResourceAnnotation.SPRING, ResourceAnnotation.IGNITE_INSTANCE, ResourceAnnotation.LOGGER, - ResourceAnnotation.SERVICE + ResourceAnnotation.SERVICE, + ResourceAnnotation.METRIC_MANAGER ), /** */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java index d148142435dfc..07cd319a725b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java @@ -74,6 +74,8 @@ public GridResourceProcessor(GridKernalContext ctx) { new GridResourceLoggerInjector(ctx.config().getGridLogger()); injectorByAnnotation[GridResourceIoc.ResourceAnnotation.IGNITE_INSTANCE.ordinal()] = new GridResourceBasicInjector<>(ctx.grid()); + injectorByAnnotation[GridResourceIoc.ResourceAnnotation.METRIC_MANAGER.ordinal()] = + new GridResourceSupplierInjector<>(ctx::metric); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceSupplierInjector.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceSupplierInjector.java new file mode 100644 index 0000000000000..679303b80660d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceSupplierInjector.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.resource; + +import java.util.function.Supplier; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.managers.deployment.GridDeployment; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * Simple injector which wraps resource object supplier. + * + * @param Type of injected resource. + */ +class GridResourceSupplierInjector implements GridResourceInjector { + /** Resource to inject. */ + private final Supplier supplier; + + /** + * Creates injector. + * + * @param supplier Resource supplier. + */ + GridResourceSupplierInjector(Supplier supplier) { + this.supplier = supplier; + } + + /** {@inheritDoc} */ + @Override public void inject(GridResourceField field, Object target, Class depCls, GridDeployment dep) + throws IgniteCheckedException { + GridResourceUtils.inject(field.getField(), target, supplier.get()); + } + + /** {@inheritDoc} */ + @Override public void inject(GridResourceMethod mtd, Object target, Class depCls, GridDeployment dep) + throws IgniteCheckedException { + GridResourceUtils.inject(mtd.getMethod(), target, supplier.get()); + } + + /** {@inheritDoc} */ + @Override public void undeploy(GridDeployment dep) { + /* No-op. There is no cache. */ + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridResourceSupplierInjector.class, this); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMetricsListener.java b/modules/core/src/main/java/org/apache/ignite/internal/resources/MetricManagerResource.java similarity index 66% rename from modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMetricsListener.java rename to modules/core/src/main/java/org/apache/ignite/internal/resources/MetricManagerResource.java index 1bdae3007eca4..21cddd4f73ef4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMetricsListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/resources/MetricManagerResource.java @@ -15,21 +15,18 @@ * limitations under the License. */ -package org.apache.ignite.internal.util.nio; +package org.apache.ignite.internal.resources; -import java.util.EventListener; +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; -/** - * Metrics listener for NIO communication. - */ -public interface GridNioMetricsListener extends EventListener { - /** - * @param bytesCnt Number of sent bytes. - */ - public void onBytesSent(int bytesCnt); - - /** - * @param bytesCnt Number of received bytes. - */ - public void onBytesReceived(int bytesCnt); +/** */ +@Documented +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.METHOD, ElementType.FIELD}) +public @interface MetricManagerResource { + // No-op. } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java index 7af6139893a8e..2abfea74bfd8b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java @@ -26,13 +26,14 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.processors.metric.MetricRegistry; +import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric; import org.apache.ignite.internal.util.nio.GridNioFilter; import org.apache.ignite.internal.util.nio.GridNioFilterAdapter; import org.apache.ignite.internal.util.nio.GridNioFilterChain; import org.apache.ignite.internal.util.nio.GridNioFinishedFuture; import org.apache.ignite.internal.util.nio.GridNioFuture; import org.apache.ignite.internal.util.nio.GridNioMessageWriterFactory; -import org.apache.ignite.internal.util.nio.GridNioMetricsListener; import org.apache.ignite.internal.util.nio.GridNioServerListener; import org.apache.ignite.internal.util.nio.GridNioSession; import org.apache.ignite.internal.util.nio.GridNioSessionImpl; @@ -40,6 +41,11 @@ import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.plugin.extensions.communication.Message; +import static org.apache.ignite.internal.util.nio.GridNioServer.RECEIVED_BYTES_METRIC_DESC; +import static org.apache.ignite.internal.util.nio.GridNioServer.RECEIVED_BYTES_METRIC_NAME; +import static org.apache.ignite.internal.util.nio.GridNioServer.SENT_BYTES_METRIC_DESC; +import static org.apache.ignite.internal.util.nio.GridNioServer.SENT_BYTES_METRIC_NAME; + /** * Allows to re-use existing {@link GridNioFilter}s on IPC (specifically shared memory IPC) * communications. @@ -63,25 +69,32 @@ public class IpcToNioAdapter { /** */ private final ByteBuffer writeBuf; - /** */ - private final GridNioMetricsListener metricsLsnr; + /** Received bytes count metric. */ + private final LongAdderMetric rcvdBytesCntMetric; + + /** Sent bytes count metric. */ + private final LongAdderMetric sentBytesCntMetric; /** */ private final GridNioMessageWriterFactory writerFactory; /** - * @param metricsLsnr Metrics listener. + * @param mreg Metrics registry. * @param log Log. * @param endp Endpoint. * @param lsnr Listener. * @param writerFactory Writer factory. * @param filters Filters. */ - public IpcToNioAdapter(GridNioMetricsListener metricsLsnr, IgniteLogger log, IpcEndpoint endp, - GridNioServerListener lsnr, GridNioMessageWriterFactory writerFactory, GridNioFilter... filters) { - assert metricsLsnr != null; + public IpcToNioAdapter(MetricRegistry mreg, IgniteLogger log, IpcEndpoint endp, + GridNioServerListener lsnr, GridNioMessageWriterFactory writerFactory, GridNioFilter... filters + ) { + assert mreg != null; + + rcvdBytesCntMetric = mreg.longAdderMetric(RECEIVED_BYTES_METRIC_NAME, RECEIVED_BYTES_METRIC_DESC); + + sentBytesCntMetric = mreg.longAdderMetric(SENT_BYTES_METRIC_NAME, SENT_BYTES_METRIC_DESC); - this.metricsLsnr = metricsLsnr; this.endp = endp; this.writerFactory = writerFactory; @@ -116,7 +129,7 @@ public void serve() throws InterruptedException { int read = in.read(readBuf.array(), pos, readBuf.remaining()); if (read > 0) { - metricsLsnr.onBytesReceived(read); + rcvdBytesCntMetric.add(read); readBuf.position(0); readBuf.limit(pos + read); @@ -167,7 +180,7 @@ private GridNioFuture send(Message msg) { try { int cnt = U.writeMessageFully(msg, endp.outputStream(), writeBuf, writerFactory.writer(ses)); - metricsLsnr.onBytesSent(cnt); + sentBytesCntMetric.add(cnt); } catch (IOException | IgniteCheckedException e) { return new GridNioFinishedFuture(e); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java index ed7e929145bd3..79de224b7772a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java @@ -20,7 +20,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.jetbrains.annotations.Nullable; /** * Implements basic lifecycle for communication clients. @@ -32,19 +31,14 @@ public abstract class GridAbstractCommunicationClient implements GridCommunicati /** Reservations. */ private final AtomicBoolean closed = new AtomicBoolean(); - /** Metrics listener. */ - protected final GridNioMetricsListener metricsLsnr; - /** */ private final int connIdx; /** * @param connIdx Connection index. - * @param metricsLsnr Metrics listener. */ - protected GridAbstractCommunicationClient(int connIdx, @Nullable GridNioMetricsListener metricsLsnr) { + protected GridAbstractCommunicationClient(int connIdx) { this.connIdx = connIdx; - this.metricsLsnr = metricsLsnr; } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index 86f33e26d9ccc..5578d3b92ffa1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -55,6 +55,8 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.metric.MetricRegistry; +import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.future.GridCompoundFuture; @@ -129,6 +131,24 @@ public class GridNioServer { private static final boolean DISABLE_KEYSET_OPTIMIZATION = IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_NO_SELECTOR_OPTS); + /** */ + public static final String OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_NAME = "outboundMessagesQueueSize"; + + /** */ + public static final String OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_DESC = "Number of messages waiting to be sent"; + + /** */ + public static final String RECEIVED_BYTES_METRIC_NAME = "receivedBytes"; + + /** */ + public static final String RECEIVED_BYTES_METRIC_DESC = "Total number of bytes received by current node"; + + /** */ + public static final String SENT_BYTES_METRIC_NAME = "sentBytes"; + + /** */ + public static final String SENT_BYTES_METRIC_DESC = "Total number of bytes sent by current node"; + /** * */ @@ -213,8 +233,18 @@ public class GridNioServer { /** Whether direct mode is used. */ private final boolean directMode; - /** Metrics listener. */ - private final GridNioMetricsListener metricsLsnr; + /** */ + @Nullable private final MetricRegistry mreg; + + /** Received bytes count metric. */ + @Nullable private final LongAdderMetric rcvdBytesCntMetric; + + /** Sent bytes count metric. */ + @Nullable private final LongAdderMetric sentBytesCntMetric; + + /** Outbound messages queue size. */ + @Nullable private final LongAdderMetric outboundMessagesQueueSizeMetric; + /** Sessions. */ private final GridConcurrentHashSet sessions = new GridConcurrentHashSet<>(); @@ -267,12 +297,12 @@ public class GridNioServer { * @param sndQueueLimit Send queue limit. * @param directMode Whether direct mode is used. * @param daemon Daemon flag to create threads. - * @param metricsLsnr Metrics listener. * @param writerFactory Writer factory. * @param skipRecoveryPred Skip recovery predicate. * @param msgQueueLsnr Message queue size listener. * @param readWriteSelectorsAssign If {@code true} then in/out connections are assigned to even/odd workers. * @param workerLsnr Worker lifecycle listener. + * @param mreg Metrics registry. * @param filters Filters for this server. * @throws IgniteCheckedException If failed. */ @@ -293,12 +323,12 @@ private GridNioServer( int sndQueueLimit, boolean directMode, boolean daemon, - GridNioMetricsListener metricsLsnr, GridNioMessageWriterFactory writerFactory, IgnitePredicate skipRecoveryPred, IgniteBiInClosure msgQueueLsnr, boolean readWriteSelectorsAssign, @Nullable GridWorkerListener workerLsnr, + @Nullable MetricRegistry mreg, GridNioFilter... filters ) throws IgniteCheckedException { if (port != -1) @@ -382,7 +412,6 @@ private GridNioServer( } this.directMode = directMode; - this.metricsLsnr = metricsLsnr; this.writerFactory = writerFactory; this.skipRecoveryPred = skipRecoveryPred != null ? skipRecoveryPred : F.alwaysFalse(); @@ -404,6 +433,19 @@ private GridNioServer( } this.balancer = balancer0; + + this.mreg = mreg; + + rcvdBytesCntMetric = mreg == null ? + null : mreg.longAdderMetric(RECEIVED_BYTES_METRIC_NAME, RECEIVED_BYTES_METRIC_DESC); + + sentBytesCntMetric = mreg == null ? + null : mreg.longAdderMetric(SENT_BYTES_METRIC_NAME, SENT_BYTES_METRIC_DESC); + + outboundMessagesQueueSizeMetric = mreg == null ? null : mreg.longAdderMetric( + OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_NAME, + OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_DESC + ); } /** @@ -1133,8 +1175,8 @@ else if (cnt == 0) if (log.isTraceEnabled()) log.trace("Bytes received [sockCh=" + sockCh + ", cnt=" + cnt + ']'); - if (metricsLsnr != null) - metricsLsnr.onBytesReceived(cnt); + if (rcvdBytesCntMetric != null) + rcvdBytesCntMetric.add(cnt); ses.bytesReceived(cnt); @@ -1195,8 +1237,8 @@ else if (cnt == 0) if (log.isTraceEnabled()) log.trace("Bytes sent [sockCh=" + sockCh + ", cnt=" + cnt + ']'); - if (metricsLsnr != null) - metricsLsnr.onBytesSent(cnt); + if (sentBytesCntMetric != null) + sentBytesCntMetric.add(cnt); ses.bytesSent(cnt); } @@ -1296,8 +1338,8 @@ protected DirectNioClientWorker( if (cnt == 0) return; - if (metricsLsnr != null) - metricsLsnr.onBytesReceived(cnt); + if (rcvdBytesCntMetric != null) + rcvdBytesCntMetric.add(cnt); ses.bytesReceived(cnt); onRead(cnt); @@ -1377,8 +1419,8 @@ private void processWriteSsl(SelectionKey key) throws IOException { if (sslNetBuf != null) { int cnt = sockCh.write(sslNetBuf); - if (metricsLsnr != null) - metricsLsnr.onBytesSent(cnt); + if (sentBytesCntMetric != null) + sentBytesCntMetric.add(cnt); ses.bytesSent(cnt); @@ -1494,8 +1536,8 @@ private void processWriteSsl(SelectionKey key) throws IOException { if (log.isTraceEnabled()) log.trace("Bytes sent [sockCh=" + sockCh + ", cnt=" + cnt + ']'); - if (metricsLsnr != null) - metricsLsnr.onBytesSent(cnt); + if (sentBytesCntMetric != null) + sentBytesCntMetric.add(cnt); ses.bytesSent(cnt); } @@ -1551,8 +1593,8 @@ private boolean writeSslSystem(GridSelectorNioSessionImpl ses, WritableByteChann while ((buf = queue.peek()) != null) { int cnt = sockCh.write(buf); - if (metricsLsnr != null) - metricsLsnr.onBytesSent(cnt); + if (sentBytesCntMetric != null) + sentBytesCntMetric.add(cnt); ses.bytesSent(cnt); @@ -1681,8 +1723,8 @@ private void processWrite0(SelectionKey key) throws IOException { if (log.isTraceEnabled()) log.trace("Bytes sent [sockCh=" + sockCh + ", cnt=" + cnt + ']'); - if (metricsLsnr != null) - metricsLsnr.onBytesSent(cnt); + if (sentBytesCntMetric != null) + sentBytesCntMetric.add(cnt); ses.bytesSent(cnt); onWrite(cnt); @@ -2583,6 +2625,7 @@ private void register(NioOperationFuture fut) { (InetSocketAddress)sockCh.getRemoteAddress(), fut.accepted(), sndQueueLimit, + mreg, writeBuf, readBuf); @@ -2863,12 +2906,10 @@ final void reset0() { * @return Write queue size. */ public int outboundMessagesQueueSize() { - int res = 0; - - for (GridSelectorNioSessionImpl ses : sessions) - res += ses.writeQueueSize(); + if (outboundMessagesQueueSizeMetric == null) + return -1; - return res; + return (int) outboundMessagesQueueSizeMetric.value(); } /** @@ -3672,9 +3713,6 @@ public static class Builder { /** Whether direct mode is used. */ private boolean directMode; - /** Metrics listener. */ - private GridNioMetricsListener metricsLsnr; - /** NIO filters. */ private GridNioFilter[] filters; @@ -3708,6 +3746,9 @@ public static class Builder { /** Worker lifecycle listener to be used by server's worker threads. */ private GridWorkerListener workerLsnr; + /** Metrics registry. */ + private MetricRegistry mreg; + /** * Finishes building the instance. * @@ -3732,12 +3773,12 @@ public GridNioServer build() throws IgniteCheckedException { sndQueueLimit, directMode, daemon, - metricsLsnr, writerFactory, skipRecoveryPred, msgQueueLsnr, readWriteSelectorsAssign, workerLsnr, + mreg, filters != null ? Arrays.copyOf(filters, filters.length) : EMPTY_FILTERS ); @@ -3913,16 +3954,6 @@ public Builder directMode(boolean directMode) { return this; } - /** - * @param metricsLsnr Metrics listener. - * @return This for chaining. - */ - public Builder metricsListener(GridNioMetricsListener metricsLsnr) { - this.metricsLsnr = metricsLsnr; - - return this; - } - /** * @param filters NIO filters. * @return This for chaining. @@ -4002,6 +4033,16 @@ public Builder workerListener(GridWorkerListener workerLsnr) { return this; } + + /** + * @param mreg Metrics registry. + * @return This for chaining. + */ + public Builder metricRegistry(MetricRegistry mreg) { + this.mreg = mreg; + + return this; + } } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java index ac2fc6133d54c..87ecb301a5780 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java @@ -28,12 +28,17 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.processors.metric.MetricRegistry; +import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.util.deque.FastSizeDeque; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.internal.util.nio.GridNioServer.OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_DESC; +import static org.apache.ignite.internal.util.nio.GridNioServer.OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_NAME; + /** * Session implementation bound to selector API and socket API. * Note that this implementation requires non-null values for local and remote @@ -81,6 +86,9 @@ public class GridSelectorNioSessionImpl extends GridNioSessionImpl implements Gr /** Close channel on session #close() called. */ private volatile boolean closeSocket = true; + /** Outbound messages queue size metric. */ + @Nullable private final LongAdderMetric outboundMessagesQueueSizeMetric; + /** * Creates session instance. * @@ -102,6 +110,7 @@ public class GridSelectorNioSessionImpl extends GridNioSessionImpl implements Gr InetSocketAddress rmtAddr, boolean accepted, int sndQueueLimit, + @Nullable MetricRegistry mreg, @Nullable ByteBuffer writeBuf, @Nullable ByteBuffer readBuf ) { @@ -132,6 +141,11 @@ public class GridSelectorNioSessionImpl extends GridNioSessionImpl implements Gr this.readBuf = readBuf; } + + outboundMessagesQueueSizeMetric = mreg == null ? null : mreg.longAdderMetric( + OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_NAME, + OUTBOUND_MESSAGES_QUEUE_SIZE_METRIC_DESC + ); } /** {@inheritDoc} */ @@ -295,6 +309,9 @@ int offerSystemFuture(SessionWriteRequest writeFut) { assert res : "Future was not added to queue"; + if (outboundMessagesQueueSizeMetric != null) + outboundMessagesQueueSizeMetric.increment(); + return queue.sizex(); } @@ -320,6 +337,9 @@ int offerFuture(SessionWriteRequest writeFut) { assert res : "Future was not added to queue"; + if (outboundMessagesQueueSizeMetric != null) + outboundMessagesQueueSizeMetric.increment(); + return queue.sizex(); } @@ -332,6 +352,9 @@ void resend(Collection futs) { boolean add = queue.addAll(futs); assert add; + + if (outboundMessagesQueueSizeMetric != null) + outboundMessagesQueueSizeMetric.add(futs.size()); } /** @@ -341,6 +364,9 @@ void resend(Collection futs) { SessionWriteRequest last = queue.poll(); if (last != null) { + if (outboundMessagesQueueSizeMetric != null) + outboundMessagesQueueSizeMetric.decrement(); + if (sem != null && !last.messageThread()) sem.release(); @@ -371,7 +397,12 @@ void resend(Collection futs) { boolean removeFuture(SessionWriteRequest fut) { assert closed(); - return queue.removeLastOccurrence(fut); + boolean rmv = queue.removeLastOccurrence(fut); + + if (rmv && outboundMessagesQueueSizeMetric != null) + outboundMessagesQueueSizeMetric.decrement(); + + return rmv; } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java index f93738c18d4c9..fce6ea2628d7d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java @@ -26,6 +26,8 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.processors.metric.MetricRegistry; +import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric; import org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryClientEndpoint; import org.apache.ignite.internal.util.lang.IgniteInClosure2X; import org.apache.ignite.internal.util.typedef.internal.S; @@ -33,6 +35,9 @@ import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageFormatter; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.internal.util.nio.GridNioServer.SENT_BYTES_METRIC_NAME; /** * @@ -47,9 +52,12 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien /** */ private final MessageFormatter formatter; + /** Sent bytes count metric. */ + @Nullable protected final AtomicLongMetric sentBytesCntMetric; + /** * @param connIdx Connection index. - * @param metricsLsnr Metrics listener. + * @param mreg Metrics registry. * @param port Shared memory IPC server port. * @param connTimeout Connection timeout. * @param log Logger. @@ -58,15 +66,15 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien */ public GridShmemCommunicationClient( int connIdx, - GridNioMetricsListener metricsLsnr, + MetricRegistry mreg, int port, long connTimeout, IgniteLogger log, - MessageFormatter formatter) - throws IgniteCheckedException { - super(connIdx, metricsLsnr); + MessageFormatter formatter + ) throws IgniteCheckedException { + super(connIdx); - assert metricsLsnr != null; + assert mreg != null; assert port > 0 && port < 0xffff; assert connTimeout >= 0; @@ -77,6 +85,8 @@ public GridShmemCommunicationClient( writeBuf.order(ByteOrder.nativeOrder()); this.formatter = formatter; + + sentBytesCntMetric = mreg.findMetric(SENT_BYTES_METRIC_NAME); } /** {@inheritDoc} */ @@ -111,7 +121,7 @@ public GridShmemCommunicationClient( try { shmem.outputStream().write(data, 0, len); - metricsLsnr.onBytesSent(len); + sentBytesCntMetric.add(len); } catch (IOException e) { throw new IgniteCheckedException("Failed to send message to remote node: " + shmem, e); @@ -133,7 +143,7 @@ public GridShmemCommunicationClient( try { int cnt = U.writeMessageFully(msg, shmem.outputStream(), writeBuf, formatter.writer(nodeId)); - metricsLsnr.onBytesSent(cnt); + sentBytesCntMetric.add(cnt); } catch (IOException e) { throw new IgniteCheckedException("Failed to send message to remote node: " + shmem, e); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java index 98f694162593a..768e62461a75a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java @@ -52,7 +52,7 @@ public GridTcpNioCommunicationClient( GridNioSession ses, IgniteLogger log ) { - super(connIdx, null); + super(connIdx); assert ses != null; assert log != null; diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java index 1ca54517b65ee..4fdfa1370d4a9 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java @@ -22,45 +22,81 @@ import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.LongAdder; - +import java.util.function.Function; import org.apache.ignite.internal.managers.communication.GridIoMessage; -import org.apache.ignite.internal.util.nio.GridNioMetricsListener; -import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.processors.metric.GridMetricManager; +import org.apache.ignite.internal.processors.metric.MetricRegistry; +import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric; import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.metric.LongMetric; +import org.apache.ignite.spi.metric.Metric; + +import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.SEPARATOR; +import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName; +import static org.apache.ignite.internal.util.nio.GridNioServer.RECEIVED_BYTES_METRIC_DESC; +import static org.apache.ignite.internal.util.nio.GridNioServer.RECEIVED_BYTES_METRIC_NAME; +import static org.apache.ignite.internal.util.nio.GridNioServer.SENT_BYTES_METRIC_DESC; +import static org.apache.ignite.internal.util.nio.GridNioServer.SENT_BYTES_METRIC_NAME; +import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.COMMUNICATION_METRICS_GROUP_NAME; +import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RECEIVED_MESSAGES_BY_NODE_ID_METRIC_DESC; +import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RECEIVED_MESSAGES_BY_NODE_ID_METRIC_NAME; +import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RECEIVED_MESSAGES_BY_TYPE_METRIC_DESC; +import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RECEIVED_MESSAGES_BY_TYPE_METRIC_NAME; +import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RECEIVED_MESSAGES_METRIC_DESC; +import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RECEIVED_MESSAGES_METRIC_NAME; +import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.SENT_MESSAGES_BY_NODE_ID_METRIC_DESC; +import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.SENT_MESSAGES_BY_NODE_ID_METRIC_NAME; +import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.SENT_MESSAGES_BY_TYPE_METRIC_DESC; +import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.SENT_MESSAGES_BY_TYPE_METRIC_NAME; +import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.SENT_MESSAGES_METRIC_DESC; +import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.SENT_MESSAGES_METRIC_NAME; /** * Statistics for {@link org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi}. */ -public class TcpCommunicationMetricsListener implements GridNioMetricsListener{ - /** Counter factory. */ - private static final Callable HOLDER_FACTORY = new Callable() { - @Override public LongHolder call() { - return new LongHolder(); - } - }; +class TcpCommunicationMetricsListener { + /** Metrics manager. */ + private final GridMetricManager mmgr; - /** Received bytes count. */ - private final LongAdder rcvdBytesCnt = new LongAdder(); - - /** Sent bytes count.*/ - private final LongAdder sentBytesCnt = new LongAdder(); + /** Metrics registry. */ + private final MetricRegistry mreg; /** All registered metrics. */ private final Set allMetrics = Collections.newSetFromMap(new ConcurrentHashMap<>()); /** Thread-local metrics. */ - private final ThreadLocal threadMetrics = new ThreadLocal() { - @Override protected ThreadMetrics initialValue() { - ThreadMetrics metrics = new ThreadMetrics(); + private final ThreadLocal threadMetrics = ThreadLocal.withInitial(() -> { + ThreadMetrics metrics = new ThreadMetrics(); - allMetrics.add(metrics); + allMetrics.add(metrics); - return metrics; - } - }; + return metrics; + }); + + /** Function to be used in {@link Map#computeIfAbsent(Object, Function)} of {@code sentMsgsMetricsByType}. */ + private final Function sentMsgsCntByTypeMetricFactory; + + /** Function to be used in {@link Map#computeIfAbsent(Object, Function)} of {@code rcvdMsgsMetricsByType}. */ + private final Function rcvdMsgsCntByTypeMetricFactory; + + /** Function to be used in {@link Map#computeIfAbsent(Object, Function)} of {@code sentMsgsMetricsByNodeId}. */ + private final Function sentMsgsCntByNodeIdMetricFactory; + + /** Function to be used in {@link Map#computeIfAbsent(Object, Function)} of {@code rcvdMsgsMetricsByNodeId}. */ + private final Function rcvdMsgsCntByNodeIdMetricFactory; + + /** Sent bytes count metric.*/ + private final LongAdderMetric sentBytesMetric; + + /** Received bytes count metric. */ + private final LongAdderMetric rcvdBytesMetric; + + /** Sent messages count metric. */ + private final LongAdderMetric sentMsgsMetric; + + /** Received messages count metric. */ + private final LongAdderMetric rcvdMsgsMetric; /** Method to synchronize access to message type map. */ private final Object msgTypMapMux = new Object(); @@ -68,14 +104,49 @@ public class TcpCommunicationMetricsListener implements GridNioMetricsListener{ /** Message type map. */ private volatile Map msgTypMap; - /** {@inheritDoc} */ - @Override public void onBytesSent(int bytesCnt) { - sentBytesCnt.add(bytesCnt); + /** */ + public TcpCommunicationMetricsListener(GridMetricManager mmgr) { + this.mmgr = mmgr; + + mreg = mmgr.registry(COMMUNICATION_METRICS_GROUP_NAME); + + sentMsgsCntByTypeMetricFactory = directType -> mreg.longAdderMetric( + sentMessagesByTypeMetricName(directType), + SENT_MESSAGES_BY_TYPE_METRIC_DESC + ); + rcvdMsgsCntByTypeMetricFactory = directType -> mreg.longAdderMetric( + receivedMessagesByTypeMetricName(directType), + RECEIVED_MESSAGES_BY_TYPE_METRIC_DESC + ); + + sentMsgsCntByNodeIdMetricFactory = nodeId -> + mmgr.registry(metricName(COMMUNICATION_METRICS_GROUP_NAME, nodeId.toString())) + .findMetric(SENT_MESSAGES_BY_NODE_ID_METRIC_NAME); + + rcvdMsgsCntByNodeIdMetricFactory = nodeId -> + mmgr.registry(metricName(COMMUNICATION_METRICS_GROUP_NAME, nodeId.toString())) + .findMetric(RECEIVED_MESSAGES_BY_NODE_ID_METRIC_NAME); + + sentBytesMetric = mreg.longAdderMetric(SENT_BYTES_METRIC_NAME, SENT_BYTES_METRIC_DESC); + rcvdBytesMetric = mreg.longAdderMetric(RECEIVED_BYTES_METRIC_NAME, RECEIVED_BYTES_METRIC_DESC); + + sentMsgsMetric = mreg.longAdderMetric(SENT_MESSAGES_METRIC_NAME, SENT_MESSAGES_METRIC_DESC); + rcvdMsgsMetric = mreg.longAdderMetric(RECEIVED_MESSAGES_METRIC_NAME, RECEIVED_MESSAGES_METRIC_DESC); + + mmgr.addMetricRegistryCreationListener(mreg -> { + // Metrics for the specific nodes. + if (!mreg.name().startsWith(COMMUNICATION_METRICS_GROUP_NAME + SEPARATOR)) + return; + + mreg.longAdderMetric(SENT_MESSAGES_BY_NODE_ID_METRIC_NAME, SENT_MESSAGES_BY_NODE_ID_METRIC_DESC); + + mreg.longAdderMetric(RECEIVED_MESSAGES_BY_NODE_ID_METRIC_NAME, RECEIVED_MESSAGES_BY_NODE_ID_METRIC_DESC); + }); } - /** {@inheritDoc} */ - @Override public void onBytesReceived(int bytesCnt) { - rcvdBytesCnt.add(bytesCnt); + /** Metrics registry. */ + public MetricRegistry metricRegistry() { + return mreg; } /** @@ -93,9 +164,9 @@ public void onMessageSent(Message msg, UUID nodeId) { updateMessageTypeMap(msg); - ThreadMetrics metrics = threadMetrics.get(); + sentMsgsMetric.increment(); - metrics.onMessageSent(msg, nodeId); + threadMetrics.get().onMessageSent(msg, nodeId); } } @@ -114,9 +185,9 @@ public void onMessageReceived(Message msg, UUID nodeId) { updateMessageTypeMap(msg); - ThreadMetrics metrics = threadMetrics.get(); + rcvdMsgsMetric.increment(); - metrics.onMessageReceived(msg, nodeId); + threadMetrics.get().onMessageReceived(msg, nodeId); } } @@ -126,17 +197,9 @@ public void onMessageReceived(Message msg, UUID nodeId) { * @return Sent messages count. */ public int sentMessagesCount() { - long res = 0; - - for (ThreadMetrics metrics : allMetrics) - res += metrics.sentMsgsCnt; + int res0 = (int)sentMsgsMetric.value(); - int res0 = (int)res; - - if (res0 < 0) - res0 = Integer.MAX_VALUE; - - return res0; + return res0 < 0 ? Integer.MAX_VALUE : res0; } /** @@ -145,7 +208,7 @@ public int sentMessagesCount() { * @return Sent bytes count. */ public long sentBytesCount() { - return sentBytesCnt.longValue(); + return sentBytesMetric.value(); } /** @@ -154,17 +217,9 @@ public long sentBytesCount() { * @return Received messages count. */ public int receivedMessagesCount() { - long res = 0; - - for (ThreadMetrics metrics : allMetrics) - res += metrics.rcvdMsgsCnt; + int res0 = (int)rcvdMsgsMetric.value(); - int res0 = (int)res; - - if (res0 < 0) - res0 = Integer.MAX_VALUE; - - return res0; + return res0 < 0 ? Integer.MAX_VALUE : res0; } /** @@ -173,7 +228,7 @@ public int receivedMessagesCount() { * @return Received bytes count. */ public long receivedBytesCount() { - return rcvdBytesCnt.longValue(); + return rcvdBytesMetric.value(); } /** @@ -182,35 +237,7 @@ public long receivedBytesCount() { * @return Map containing message types and respective counts. */ public Map receivedMessagesByType() { - Map res = new HashMap<>(); - - for (ThreadMetrics metrics : allMetrics) - addMetrics(res, metrics.rcvdMsgsCntByType); - - return convertMessageTypes(res); - } - - /** - * Convert message types. - * - * @param input Input map. - * @return Result map. - */ - private Map convertMessageTypes(Map input) { - Map res = new HashMap<>(input.size()); - - Map msgTypMap0 = msgTypMap; - - if (msgTypMap0 != null) { - for (Map.Entry inputEntry : input.entrySet()) { - String typeName = msgTypMap0.get(inputEntry.getKey()); - - if (typeName != null) - res.put(typeName, inputEntry.getValue()); - } - } - - return res; + return collectMessagesCountByType(RECEIVED_MESSAGES_BY_TYPE_METRIC_NAME + SEPARATOR); } /** @@ -219,12 +246,7 @@ private Map convertMessageTypes(Map input) { * @return Map containing sender nodes and respective counts. */ public Map receivedMessagesByNode() { - Map res = new HashMap<>(); - - for (ThreadMetrics metrics : allMetrics) - addMetrics(res, metrics.rcvdMsgsCntByNode); - - return res; + return collectMessagesCountByNodeId(RECEIVED_MESSAGES_BY_NODE_ID_METRIC_NAME); } /** @@ -233,12 +255,7 @@ public Map receivedMessagesByNode() { * @return Map containing message types and respective counts. */ public Map sentMessagesByType() { - Map res = new HashMap<>(); - - for (ThreadMetrics metrics : allMetrics) - addMetrics(res, metrics.sentMsgsCntByType); - - return convertMessageTypes(res); + return collectMessagesCountByType(SENT_MESSAGES_BY_TYPE_METRIC_NAME + SEPARATOR); } /** @@ -247,10 +264,48 @@ public Map sentMessagesByType() { * @return Map containing receiver nodes and respective counts. */ public Map sentMessagesByNode() { + return collectMessagesCountByNodeId(SENT_MESSAGES_BY_NODE_ID_METRIC_NAME); + } + + /** */ + protected Map collectMessagesCountByType(String prefix) { + Map res = new HashMap<>(); + + prefix = metricName(COMMUNICATION_METRICS_GROUP_NAME, prefix); + + for (Metric metric : mreg) { + if (metric.name().startsWith(prefix)) { + short directType = Short.parseShort(metric.name().substring(prefix.length())); + + Map msgTypMap0 = msgTypMap; + + if (msgTypMap0 != null) { + String typeName = msgTypMap0.get(directType); + + if (typeName != null) + res.put(typeName, ((LongMetric)metric).value()); + } + } + } + + return res; + } + + /** */ + protected Map collectMessagesCountByNodeId(String metricName) { Map res = new HashMap<>(); - for (ThreadMetrics metrics : allMetrics) - addMetrics(res, metrics.sentMsgsCntByNode); + String mregPrefix = COMMUNICATION_METRICS_GROUP_NAME + SEPARATOR; + + for (MetricRegistry mreg : mmgr) { + if (mreg.name().startsWith(mregPrefix)) { + String nodeIdStr = mreg.name().substring(mregPrefix.length()); + + UUID nodeId = UUID.fromString(nodeIdStr); + + res.put(nodeId, mreg.findMetric(metricName).value()); + } + } return res; } @@ -259,28 +314,38 @@ public Map sentMessagesByNode() { * Resets metrics for this instance. */ public void resetMetrics() { - for (ThreadMetrics metrics : allMetrics) - metrics.reset(); + rcvdMsgsMetric.reset(); + sentMsgsMetric.reset(); + + sentBytesMetric.reset(); + rcvdBytesMetric.reset(); - sentBytesCnt.reset(); - rcvdBytesCnt.reset(); + for (Metric metric : mreg) { + if (metric.name().startsWith(SENT_MESSAGES_BY_TYPE_METRIC_NAME)) + metric.reset(); + else if (metric.name().startsWith(RECEIVED_MESSAGES_BY_TYPE_METRIC_NAME)) + metric.reset(); + } + + for (MetricRegistry mreg : mmgr) { + if (mreg.name().startsWith(COMMUNICATION_METRICS_GROUP_NAME + SEPARATOR)) { + mreg.findMetric(SENT_MESSAGES_BY_NODE_ID_METRIC_NAME).reset(); + + mreg.findMetric(RECEIVED_MESSAGES_BY_NODE_ID_METRIC_NAME).reset(); + } + } } /** - * Add single metrics to the total. - * - * @param total Total. - * @param current Current metrics. + * @param nodeId Left node id. */ - private void addMetrics(Map total, Map current) { - for (Map.Entry entry : current.entrySet()) { - T key = entry.getKey(); - long val = entry.getValue().val; - - Long prevVal = total.get(key); - - total.put(key, prevVal == null ? val : prevVal + val); + public void onNodeLeft(UUID nodeId) { + for (ThreadMetrics threadMetrics : allMetrics) { + threadMetrics.rcvdMsgsMetricsByNodeId = new HashMap<>(); + threadMetrics.sentMsgsMetricsByNodeId = new HashMap<>(); } + + mmgr.remove(metricName(COMMUNICATION_METRICS_GROUP_NAME, nodeId.toString())); } /** @@ -315,42 +380,36 @@ private void updateMessageTypeMap(Message msg) { } } - /** - * Long value holder. - */ - private static class LongHolder { - /** Value. */ - private long val; + /** Generate metric name by message direct type id. */ + public static String sentMessagesByTypeMetricName(Short directType) { + return metricName(SENT_MESSAGES_BY_TYPE_METRIC_NAME, directType.toString()); + } - /** - * Increment value. - */ - private void increment() { - val++; - } + /** Generate metric name by message direct type id. */ + public static String receivedMessagesByTypeMetricName(Short directType) { + return metricName(RECEIVED_MESSAGES_BY_TYPE_METRIC_NAME, directType.toString()); } /** * Thread-local metrics. */ - private static class ThreadMetrics { - /** Received messages count. */ - private long rcvdMsgsCnt; + private class ThreadMetrics { + /** Sent messages count metrics grouped by message type. */ + private final Map sentMsgsMetricsByType = new HashMap<>(); - /** Sent messages count.*/ - private long sentMsgsCnt; + /** Received messages count metrics grouped by message type. */ + private final Map rcvdMsgsMetricsByType = new HashMap<>(); - /** Received messages count grouped by message type. */ - private final HashMap rcvdMsgsCntByType = new HashMap<>(); - - /** Received messages count grouped by sender. */ - private final HashMap rcvdMsgsCntByNode = new HashMap<>(); + /** + * Sent messages count metrics grouped by message node id. + */ + public volatile Map sentMsgsMetricsByNodeId = new HashMap<>(); - /** Sent messages count grouped by message type. */ - private final HashMap sentMsgsCntByType = new HashMap<>(); + /** + * Received messages metrics count grouped by message node id. + */ + public volatile Map rcvdMsgsMetricsByNodeId = new HashMap<>(); - /** Sent messages count grouped by receiver. */ - private final HashMap sentMsgsCntByNode = new HashMap<>(); /** * Collects statistics for message sent by SPI. @@ -359,16 +418,9 @@ private static class ThreadMetrics { * @param nodeId Receiver node id. */ private void onMessageSent(Message msg, UUID nodeId) { - sentMsgsCnt++; - - LongHolder cntByType = F.addIfAbsent(sentMsgsCntByType, msg.directType(), HOLDER_FACTORY); - LongHolder cntByNode = F.addIfAbsent(sentMsgsCntByNode, nodeId, HOLDER_FACTORY); + sentMsgsMetricsByType.computeIfAbsent(msg.directType(), sentMsgsCntByTypeMetricFactory).increment(); - assert cntByType != null; - assert cntByNode != null; - - cntByType.increment(); - cntByNode.increment(); + sentMsgsMetricsByNodeId.computeIfAbsent(nodeId, sentMsgsCntByNodeIdMetricFactory).increment(); } /** @@ -378,30 +430,9 @@ private void onMessageSent(Message msg, UUID nodeId) { * @param nodeId Sender node id. */ private void onMessageReceived(Message msg, UUID nodeId) { - rcvdMsgsCnt++; - - LongHolder cntByType = F.addIfAbsent(rcvdMsgsCntByType, msg.directType(), HOLDER_FACTORY); - LongHolder cntByNode = F.addIfAbsent(rcvdMsgsCntByNode, nodeId, HOLDER_FACTORY); - - assert cntByType != null; - assert cntByNode != null; - - cntByType.increment(); - cntByNode.increment(); - } - - /** - * Reset metrics. - */ - private void reset() { - rcvdMsgsCnt = 0; - sentMsgsCnt = 0; - - sentMsgsCntByType.clear(); - sentMsgsCntByNode.clear(); + rcvdMsgsMetricsByType.computeIfAbsent(msg.directType(), rcvdMsgsCntByTypeMetricFactory).increment(); - rcvdMsgsCntByType.clear(); - rcvdMsgsCntByNode.clear(); + rcvdMsgsMetricsByNodeId.computeIfAbsent(nodeId, rcvdMsgsCntByNodeIdMetricFactory).increment(); } } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 99dd212da387f..0a21d5d72b5ab 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -75,6 +75,9 @@ import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener; +import org.apache.ignite.internal.processors.metric.GridMetricManager; +import org.apache.ignite.internal.processors.metric.impl.MetricUtils; +import org.apache.ignite.internal.resources.MetricManagerResource; import org.apache.ignite.internal.util.GridConcurrentFactory; import org.apache.ignite.internal.util.GridSpinReadWriteLock; import org.apache.ignite.internal.util.future.GridFinishedFuture; @@ -385,11 +388,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati public static final int MAX_CONN_PER_NODE = 1024; /** No-op runnable. */ - private static final IgniteRunnable NOOP = new IgniteRunnable() { - @Override public void run() { - // No-op. - } - }; + private static final IgniteRunnable NOOP = () -> {}; /** Node ID message type. */ public static final short NODE_ID_MSG_TYPE = -1; @@ -403,6 +402,45 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati /** Handshake wait message type. */ public static final short HANDSHAKE_WAIT_MSG_TYPE = -28; + /** Communication metrics group name. */ + public static final String COMMUNICATION_METRICS_GROUP_NAME = MetricUtils.metricName("communication", "tcp"); + + /** */ + public static final String SENT_MESSAGES_METRIC_NAME = "sentMessagesCount"; + + /** */ + public static final String SENT_MESSAGES_METRIC_DESC = "Total number of messages sent by current node"; + + /** */ + public static final String RECEIVED_MESSAGES_METRIC_NAME = "receivedMessagesCount"; + + /** */ + public static final String RECEIVED_MESSAGES_METRIC_DESC = "Total number of messages received by current node"; + + /** */ + public static final String SENT_MESSAGES_BY_TYPE_METRIC_NAME = "sentMessagesByType"; + + /** */ + public static final String SENT_MESSAGES_BY_TYPE_METRIC_DESC = "Total number of messages with given type sent by current node"; + + /** */ + public static final String RECEIVED_MESSAGES_BY_TYPE_METRIC_NAME = "receivedMessagesByType"; + + /** */ + public static final String RECEIVED_MESSAGES_BY_TYPE_METRIC_DESC = "Total number of messages with given type received by current node"; + + /** */ + public static final String SENT_MESSAGES_BY_NODE_ID_METRIC_NAME = "sentMessagesToNode"; + + /** */ + public static final String SENT_MESSAGES_BY_NODE_ID_METRIC_DESC = "Total number of messages sent by current node to the given node"; + + /** */ + public static final String RECEIVED_MESSAGES_BY_NODE_ID_METRIC_NAME = "receivedMessagesFromNode"; + + /** */ + public static final String RECEIVED_MESSAGES_BY_NODE_ID_METRIC_DESC = "Total number of messages received by current node from the given node"; + /** */ private ConnectGateway connectGate; @@ -1287,7 +1325,7 @@ class ConnectClosure implements IgniteInClosure { private volatile boolean stopping; /** Statistics. */ - private final TcpCommunicationMetricsListener metricsLsnr = new TcpCommunicationMetricsListener(); + private TcpCommunicationMetricsListener metricsLsnr; /** Client connect futures. */ private final ConcurrentMap> clientFuts = @@ -1351,6 +1389,13 @@ public AddressResolver getAddressResolver() { } } + /** */ + @MetricManagerResource + private void injectMetricManager(GridMetricManager mmgr) { + if (mmgr != null) + metricsLsnr = new TcpCommunicationMetricsListener(mmgr); + } + /** * Sets local host address for socket binding. Note that one node could have * additional addresses beside the loopback one. This configuration @@ -2532,7 +2577,6 @@ private GridNioServer resetNioServer() throws IgniteCheckedException { .socketReceiveBufferSize(sockRcvBuf) .sendQueueLimit(msgQueueLimit) .directMode(true) - .metricsListener(metricsLsnr) .writeTimeout(sockWriteTimeout) .selectorSpins(selectorSpins) .filters(filters) @@ -2544,7 +2588,8 @@ private GridNioServer resetNioServer() throws IgniteCheckedException { if (ignite instanceof IgniteEx) { IgniteEx igniteEx = (IgniteEx)ignite; - builder.workerListener(igniteEx.context().workersRegistry()); + builder.workerListener(igniteEx.context().workersRegistry()) + .metricRegistry(igniteEx.context().metric().registry(COMMUNICATION_METRICS_GROUP_NAME)); } GridNioServer srvr = builder.build(); @@ -2727,6 +2772,8 @@ private GridNioServer resetNioServer() throws IgniteCheckedException { void onNodeLeft(UUID nodeId) { assert nodeId != null; + metricsLsnr.onNodeLeft(nodeId); + GridCommunicationClient[] clients0 = clients.remove(nodeId); if (clients0 != null) { @@ -3185,7 +3232,7 @@ private String clientString(GridCommunicationClient client, ClusterNode node) th try { client = new GridShmemCommunicationClient( connIdx, - metricsLsnr, + metricsLsnr.metricRegistry(), port, timeoutHelper.nextTimeoutChunk(connTimeout), log, @@ -4518,7 +4565,7 @@ private ShmemWorker(IpcEndpoint endpoint) { }; IpcToNioAdapter adapter = new IpcToNioAdapter<>( - metricsLsnr, + metricsLsnr.metricRegistry(), log, endpoint, srvLsnr, diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java index b84211dc73f2a..399b8a36361ac 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java @@ -34,6 +34,9 @@ import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; import org.apache.ignite.internal.managers.communication.GridIoPolicy; +import org.apache.ignite.internal.processors.metric.MetricRegistry; +import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric; +import org.apache.ignite.internal.processors.metric.impl.MetricUtils; import org.apache.ignite.internal.util.typedef.CO; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteCallable; @@ -45,6 +48,10 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; +import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.COMMUNICATION_METRICS_GROUP_NAME; +import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RECEIVED_MESSAGES_BY_NODE_ID_METRIC_NAME; +import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.SENT_MESSAGES_BY_NODE_ID_METRIC_NAME; + /** * Test for TcpCommunicationSpi statistics. */ @@ -95,6 +102,8 @@ private class SynchronizedCommunicationSpi extends TcpCommunicationSpi { @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + cfg.setConsistentId(igniteInstanceName); + TcpCommunicationSpi spi = new SynchronizedCommunicationSpi(); cfg.setCommunicationSpi(spi); @@ -132,6 +141,19 @@ public void testStatistics() throws Exception { startGrids(2); try { + UUID node0Id = grid(0).localNode().id(); + UUID node1Id = grid(1).localNode().id(); + + String node0regName = MetricUtils.metricName( + COMMUNICATION_METRICS_GROUP_NAME, + node0Id.toString() + ); + + String node1regName = MetricUtils.metricName( + COMMUNICATION_METRICS_GROUP_NAME, + node1Id.toString() + ); + // Send custom message from node0 to node1. grid(0).context().io().sendToGridTopic(grid(1).cluster().localNode(), GridTopic.TOPIC_IO_TEST, new GridTestMessage(), GridIoPolicy.PUBLIC_POOL); @@ -179,6 +201,25 @@ public void testStatistics() throws Exception { assertEquals(1, msgsSentByType0.get(GridTestMessage.class.getName()).longValue()); assertEquals(1, msgsReceivedByType1.get(GridTestMessage.class.getName()).longValue()); + + MetricRegistry mreg0 = grid(0).context().metric().registry(node1regName); + MetricRegistry mreg1 = grid(1).context().metric().registry(node0regName); + + LongAdderMetric sentMetric = mreg0.findMetric(SENT_MESSAGES_BY_NODE_ID_METRIC_NAME); + assertNotNull(sentMetric); + assertEquals(mbean0.getSentMessagesCount(), sentMetric.value()); + + LongAdderMetric rcvMetric = mreg1.findMetric(RECEIVED_MESSAGES_BY_NODE_ID_METRIC_NAME); + assertNotNull(rcvMetric); + assertEquals(mbean1.getReceivedMessagesCount(), rcvMetric.value()); + + stopGrid(1); + + mreg0 = grid(0).context().metric().registry(node1regName); + + sentMetric = mreg0.findMetric(SENT_MESSAGES_BY_NODE_ID_METRIC_NAME); + assertNotNull(sentMetric); // Automatically generated by MetricRegistryCreationListener. + assertEquals(0, sentMetric.value()); } } finally { diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java index a44feedee1059..4e5db06bd4552 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java @@ -31,10 +31,13 @@ import org.apache.ignite.internal.GridLoggerProxy; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.LongJVMPauseDetector; +import org.apache.ignite.internal.processors.metric.GridMetricManager; import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor; +import org.apache.ignite.internal.processors.resource.GridResourceProcessor; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.PluginProvider; +import org.apache.ignite.spi.metric.noop.NoopMetricExporterSpi; import org.apache.ignite.testframework.GridTestUtils; /** @@ -94,6 +97,12 @@ public GridTestKernalContext(IgniteLogger log, IgniteConfiguration cfg) { GridTestUtils.setFieldValue(grid(), "ctx", this); config().setGridLogger(log); + + if (cfg.getMetricExporterSpi() == null || cfg.getMetricExporterSpi().length == 0) + cfg.setMetricExporterSpi(new NoopMetricExporterSpi()); + + add(new GridMetricManager(this)); + add(new GridResourceProcessor(this)); } /** diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java index a424779cf9500..a154fe5443608 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteTestResources.java @@ -32,6 +32,7 @@ import org.apache.ignite.internal.binary.BinaryContext; import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.processors.resource.GridResourceProcessor; +import org.apache.ignite.internal.resources.MetricManagerResource; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.logger.NullLogger; @@ -74,6 +75,9 @@ public class IgniteTestResources { /** */ private IgniteConfiguration cfg; + /** */ + private GridTestKernalContext ctx; + /** */ private GridResourceProcessor rsrcProc; @@ -95,7 +99,9 @@ public IgniteTestResources() throws IgniteCheckedException { this.jmx = prepareMBeanServer(); - this.rsrcProc = new GridResourceProcessor(new GridTestKernalContext(this.log)); + this.ctx = new GridTestKernalContext(log); + + this.rsrcProc = new GridResourceProcessor(ctx); } /** @@ -105,7 +111,8 @@ public IgniteTestResources(IgniteConfiguration cfg) throws IgniteCheckedExceptio this.cfg = cfg; this.log = rootLog.getLogger(getClass()); this.jmx = prepareMBeanServer(); - this.rsrcProc = new GridResourceProcessor(new GridTestKernalContext(this.log, this.cfg)); + this.ctx = new GridTestKernalContext(log, this.cfg); + this.rsrcProc = new GridResourceProcessor(ctx); } /** @@ -116,7 +123,8 @@ public IgniteTestResources(MBeanServer jmx) throws IgniteCheckedException { this.jmx = jmx; this.log = rootLog.getLogger(getClass()); - this.rsrcProc = new GridResourceProcessor(new GridTestKernalContext(this.log)); + this.ctx = new GridTestKernalContext(log); + this.rsrcProc = new GridResourceProcessor(ctx); } /** @@ -127,7 +135,8 @@ public IgniteTestResources(IgniteLogger log) throws IgniteCheckedException { this.log = log.getLogger(getClass()); this.jmx = prepareMBeanServer(); - this.rsrcProc = new GridResourceProcessor(new GridTestKernalContext(this.log)); + this.ctx = new GridTestKernalContext(log); + this.rsrcProc = new GridResourceProcessor(ctx); } /** @@ -185,6 +194,7 @@ public void inject(Object target) throws IgniteCheckedException { rsrcProc.injectBasicResource(target, LoggerResource.class, getLogger().getLogger(target.getClass())); rsrcProc.injectBasicResource(target, IgniteInstanceResource.class, new IgniteMock(null, locHost, nodeId, getMarshaller(), jmx, home, cfg)); + rsrcProc.injectBasicResource(target, MetricManagerResource.class, ctx.metric()); } /**