diff --git a/build.sbt b/build.sbt index 21f88cf130..2f6e97e74d 100644 --- a/build.sbt +++ b/build.sbt @@ -668,7 +668,15 @@ lazy val core = crossProject(JSPlatform, JVMPlatform, NativePlatform) ProblemFilters.exclude[ReversedMissingMethodProblem]("cats.effect.IOLocal.reset"), ProblemFilters.exclude[ReversedMissingMethodProblem]("cats.effect.IOLocal.lens"), // this filter is particulary terrible, because it can also mask real issues :( - ProblemFilters.exclude[DirectMissingMethodProblem]("cats.effect.IOLocal.lens") + ProblemFilters.exclude[DirectMissingMethodProblem]("cats.effect.IOLocal.lens"), + // internal API change, makes CpuStarvationMetrics available on all platforms + ProblemFilters.exclude[MissingClassProblem]( + "cats.effect.metrics.JvmCpuStarvationMetrics$NoOpCpuStarvationMetrics"), + ProblemFilters.exclude[MissingClassProblem]("cats.effect.metrics.CpuStarvationMetrics"), + // package-private classes moved to the `cats.effect.unsafe.metrics` package + ProblemFilters.exclude[MissingClassProblem]("cats.effect.metrics.CpuStarvation"), + ProblemFilters.exclude[MissingClassProblem]("cats.effect.metrics.CpuStarvation$"), + ProblemFilters.exclude[MissingClassProblem]("cats.effect.metrics.CpuStarvationMBean") ) ++ { if (tlIsScala3.value) { // Scala 3 specific exclusions @@ -835,7 +843,12 @@ lazy val core = crossProject(JSPlatform, JVMPlatform, NativePlatform) ProblemFilters.exclude[Problem]("cats.effect.CallbackStackOps.*"), // introduced by #3695, which ported fiber monitoring to Native // internal API change - ProblemFilters.exclude[MissingClassProblem]("cats.effect.unsafe.ES2021FiberMonitor") + ProblemFilters.exclude[MissingClassProblem]("cats.effect.unsafe.ES2021FiberMonitor"), + // internal API change, makes CpuStarvationMetrics available on all platforms + ProblemFilters.exclude[MissingClassProblem]( + "cats.effect.metrics.JsCpuStarvationMetrics"), + ProblemFilters.exclude[MissingClassProblem]( + "cats.effect.metrics.JsCpuStarvationMetrics$") ) }, mimaBinaryIssueFilters ++= { @@ -870,7 +883,12 @@ lazy val core = crossProject(JSPlatform, JVMPlatform, NativePlatform) ProblemFilters.exclude[MissingClassProblem]( "cats.effect.unsafe.PollingExecutorScheduler$SleepTask"), ProblemFilters.exclude[MissingClassProblem]("cats.effect.unsafe.QueueExecutorScheduler"), - ProblemFilters.exclude[MissingClassProblem]("cats.effect.unsafe.QueueExecutorScheduler$") + ProblemFilters.exclude[MissingClassProblem]("cats.effect.unsafe.QueueExecutorScheduler$"), + // internal API change, makes CpuStarvationMetrics available on all platforms + ProblemFilters.exclude[MissingClassProblem]( + "cats.effect.metrics.NativeCpuStarvationMetrics"), + ProblemFilters.exclude[MissingClassProblem]( + "cats.effect.metrics.NativeCpuStarvationMetrics$") ) ) diff --git a/core/js-native/src/main/scala/cats/effect/unsafe/metrics/IORuntimeMetricsCompanionPlatform.scala b/core/js-native/src/main/scala/cats/effect/unsafe/metrics/IORuntimeMetricsCompanionPlatform.scala new file mode 100644 index 0000000000..724ef3c50c --- /dev/null +++ b/core/js-native/src/main/scala/cats/effect/unsafe/metrics/IORuntimeMetricsCompanionPlatform.scala @@ -0,0 +1,35 @@ +/* + * Copyright 2020-2024 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.unsafe.metrics + +import scala.concurrent.ExecutionContext + +private[metrics] abstract class IORuntimeMetricsCompanionPlatform { + this: IORuntimeMetrics.type => + + private[unsafe] def apply(ec: ExecutionContext): IORuntimeMetrics = { + val _ = ec + new IORuntimeMetrics { + private[effect] val cpuStarvationSampler: CpuStarvationSampler = + CpuStarvationSampler() + + val cpuStarvation: CpuStarvationMetrics = + CpuStarvationMetrics(cpuStarvationSampler) + } + } + +} diff --git a/core/shared/src/main/scala/cats/effect/metrics/CpuStarvationMetrics.scala b/core/js-native/src/main/scala/cats/effect/unsafe/metrics/IORuntimeMetricsPlatform.scala similarity index 71% rename from core/shared/src/main/scala/cats/effect/metrics/CpuStarvationMetrics.scala rename to core/js-native/src/main/scala/cats/effect/unsafe/metrics/IORuntimeMetricsPlatform.scala index 920311d7f9..91ece3add2 100644 --- a/core/shared/src/main/scala/cats/effect/metrics/CpuStarvationMetrics.scala +++ b/core/js-native/src/main/scala/cats/effect/unsafe/metrics/IORuntimeMetricsPlatform.scala @@ -14,14 +14,6 @@ * limitations under the License. */ -package cats.effect.metrics +package cats.effect.unsafe.metrics -import cats.effect.IO - -import scala.concurrent.duration.FiniteDuration - -private[effect] trait CpuStarvationMetrics { - def incCpuStarvationCount: IO[Unit] - - def recordClockDrift(drift: FiniteDuration): IO[Unit] -} +private[metrics] trait IORuntimeMetricsPlatform { this: IORuntimeMetrics => } diff --git a/core/js/src/main/scala/cats/effect/IOApp.scala b/core/js/src/main/scala/cats/effect/IOApp.scala index 332de3ac9c..c95c2189b8 100644 --- a/core/js/src/main/scala/cats/effect/IOApp.scala +++ b/core/js/src/main/scala/cats/effect/IOApp.scala @@ -16,7 +16,7 @@ package cats.effect -import cats.effect.metrics.{CpuStarvationWarningMetrics, JsCpuStarvationMetrics} +import cats.effect.metrics.CpuStarvationWarningMetrics import cats.effect.std.Console import cats.effect.tracing.TracingConstants._ @@ -260,7 +260,7 @@ trait IOApp { val fiber = Spawn[IO] .raceOutcome[ExitCode, Nothing]( CpuStarvationCheck - .run(runtimeConfig, JsCpuStarvationMetrics(), onCpuStarvationWarn) + .run(runtimeConfig, runtime.metrics.cpuStarvationSampler, onCpuStarvationWarn) .background .surround(run(argList)), keepAlive) diff --git a/core/jvm/src/main/scala/cats/effect/IOApp.scala b/core/jvm/src/main/scala/cats/effect/IOApp.scala index 7b4e4bab59..e9c1571712 100644 --- a/core/jvm/src/main/scala/cats/effect/IOApp.scala +++ b/core/jvm/src/main/scala/cats/effect/IOApp.scala @@ -451,10 +451,10 @@ trait IOApp { val queue = this.queue val fiber = - JvmCpuStarvationMetrics() - .flatMap { cpuStarvationMetrics => + JvmCpuStarvationMetrics(runtime.metrics.cpuStarvationSampler) + .flatMap { _ => CpuStarvationCheck - .run(runtimeConfig, cpuStarvationMetrics, onCpuStarvationWarn) + .run(runtimeConfig, runtime.metrics.cpuStarvationSampler, onCpuStarvationWarn) .background } .surround(ioa) diff --git a/core/jvm/src/main/scala/cats/effect/metrics/CpuStarvation.scala b/core/jvm/src/main/scala/cats/effect/metrics/CpuStarvation.scala deleted file mode 100644 index 08164fffad..0000000000 --- a/core/jvm/src/main/scala/cats/effect/metrics/CpuStarvation.scala +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright 2020-2024 Typelevel - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cats.effect.metrics - -import cats.effect.IO - -import scala.concurrent.duration.FiniteDuration - -import java.util.concurrent.atomic.AtomicLong - -private[metrics] class CpuStarvation private ( - counter: AtomicLong, - currentClockDrift: AtomicLong, - maxClockDrift: AtomicLong) - extends CpuStarvationMBean { - - override def getCpuStarvationCount(): Long = counter.get() - - override def getMaxClockDriftMs(): Long = maxClockDrift.get() - - override def getCurrentClockDriftMs(): Long = currentClockDrift.get() - - def incStarvationCount: IO[Unit] = IO.delay(counter.incrementAndGet()).void - - def recordDrift(drift: FiniteDuration): IO[Unit] = { - val driftMs = drift.toMillis - - val maxDrift = - if (driftMs > 0) IO.delay(maxClockDrift.updateAndGet(math.max(_, driftMs))).void - else IO.unit - - IO.delay(currentClockDrift.set(driftMs)) >> maxDrift - } - -} - -private[metrics] object CpuStarvation { - private[metrics] def apply(): IO[CpuStarvation] = for { - counter <- IO.delay(new AtomicLong(0)) - currentClockDrift <- IO.delay(new AtomicLong(0)) - maxClockDrift <- IO.delay(new AtomicLong(0)) - } yield new CpuStarvation(counter, currentClockDrift, maxClockDrift) -} diff --git a/core/jvm/src/main/scala/cats/effect/metrics/JvmCpuStarvationMetrics.scala b/core/jvm/src/main/scala/cats/effect/metrics/JvmCpuStarvationMetrics.scala index d52a20aa3f..7a0768ab63 100644 --- a/core/jvm/src/main/scala/cats/effect/metrics/JvmCpuStarvationMetrics.scala +++ b/core/jvm/src/main/scala/cats/effect/metrics/JvmCpuStarvationMetrics.scala @@ -18,23 +18,17 @@ package cats.effect.metrics import cats.effect.{IO, Resource} import cats.effect.std.Console - -import scala.concurrent.duration.FiniteDuration +import cats.effect.unsafe.metrics.{CpuStarvation, CpuStarvationSampler} +import cats.syntax.functor._ import java.io.{PrintWriter, StringWriter} import java.lang.management.ManagementFactory import javax.management.{MBeanServer, ObjectName} -private[effect] class JvmCpuStarvationMetrics private (mbean: CpuStarvation) - extends CpuStarvationMetrics { - override def incCpuStarvationCount: IO[Unit] = mbean.incStarvationCount - - override def recordClockDrift(drift: FiniteDuration): IO[Unit] = mbean.recordDrift(drift) -} - private[effect] object JvmCpuStarvationMetrics { - private[this] val mBeanObjectName = new ObjectName("cats.effect.metrics:type=CpuStarvation") + private[this] val mBeanObjectName = new ObjectName( + "cats.effect.unsafe.metrics:type=CpuStarvation") private[this] def warning(th: Throwable) = { val exceptionWriter = new StringWriter() @@ -46,28 +40,18 @@ private[effect] object JvmCpuStarvationMetrics { |""".stripMargin } - private[this] class NoOpCpuStarvationMetrics extends CpuStarvationMetrics { - override def incCpuStarvationCount: IO[Unit] = IO.unit - - override def recordClockDrift(drift: FiniteDuration): IO[Unit] = IO.unit - } - - private[effect] def apply(): Resource[IO, CpuStarvationMetrics] = { - val acquire: IO[(MBeanServer, JvmCpuStarvationMetrics)] = for { + private[effect] def apply(metrics: CpuStarvationSampler): Resource[IO, Unit] = { + val acquire: IO[MBeanServer] = for { mBeanServer <- IO.delay(ManagementFactory.getPlatformMBeanServer) - mBean <- CpuStarvation() + mBean <- IO.pure(new CpuStarvation(metrics)) // To allow user-defined program to use the compute pool from the beginning, // here we use `IO.delay` rather than `IO.blocking`. _ <- IO.delay(mBeanServer.registerMBean(mBean, mBeanObjectName)) - } yield (mBeanServer, new JvmCpuStarvationMetrics(mBean)) + } yield mBeanServer Resource - .make(acquire) { - case (mbeanServer, _) => IO.blocking(mbeanServer.unregisterMBean(mBeanObjectName)) - } - .map(_._2) - .handleErrorWith[CpuStarvationMetrics] { th => - Resource.eval(Console[IO].errorln(warning(th))).map(_ => new NoOpCpuStarvationMetrics) - } + .make(acquire)(mbeanServer => IO.blocking(mbeanServer.unregisterMBean(mBeanObjectName))) + .void + .handleErrorWith[Unit](th => Resource.eval(Console[IO].errorln(warning(th)))) } } diff --git a/core/js/src/main/scala/cats/effect/metrics/JsCpuStarvationMetrics.scala b/core/jvm/src/main/scala/cats/effect/unsafe/metrics/CpuStarvation.scala similarity index 58% rename from core/js/src/main/scala/cats/effect/metrics/JsCpuStarvationMetrics.scala rename to core/jvm/src/main/scala/cats/effect/unsafe/metrics/CpuStarvation.scala index dc227de3ba..c85d69f845 100644 --- a/core/js/src/main/scala/cats/effect/metrics/JsCpuStarvationMetrics.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/metrics/CpuStarvation.scala @@ -14,18 +14,18 @@ * limitations under the License. */ -package cats.effect.metrics +package cats.effect.unsafe.metrics -import cats.effect.IO +private[effect] final class CpuStarvation( + sampler: CpuStarvationSampler +) extends CpuStarvationMBean { -import scala.concurrent.duration.FiniteDuration + def getCpuStarvationCount(): Long = + sampler.cpuStarvationCount() -private[effect] class JsCpuStarvationMetrics extends CpuStarvationMetrics { - override def incCpuStarvationCount: IO[Unit] = IO.unit + def getMaxClockDriftMs(): Long = + sampler.clockDriftMaxMs() - override def recordClockDrift(drift: FiniteDuration): IO[Unit] = IO.unit -} - -private[effect] object JsCpuStarvationMetrics { - private[effect] def apply(): CpuStarvationMetrics = new JsCpuStarvationMetrics + def getCurrentClockDriftMs(): Long = + sampler.clockDriftCurrentMs() } diff --git a/core/jvm/src/main/scala/cats/effect/metrics/CpuStarvationMbean.scala b/core/jvm/src/main/scala/cats/effect/unsafe/metrics/CpuStarvationMBean.scala similarity index 93% rename from core/jvm/src/main/scala/cats/effect/metrics/CpuStarvationMbean.scala rename to core/jvm/src/main/scala/cats/effect/unsafe/metrics/CpuStarvationMBean.scala index ee61c22344..6066e2470e 100644 --- a/core/jvm/src/main/scala/cats/effect/metrics/CpuStarvationMbean.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/metrics/CpuStarvationMBean.scala @@ -14,12 +14,12 @@ * limitations under the License. */ -package cats.effect.metrics +package cats.effect.unsafe.metrics /** * An MBean interfaces for monitoring when CPU starvation occurs. */ -private[metrics] trait CpuStarvationMBean { +private[unsafe] trait CpuStarvationMBean { /** * Returns the number of times CPU starvation has occurred. diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/metrics/IORuntimeMetricsCompanionPlatform.scala b/core/jvm/src/main/scala/cats/effect/unsafe/metrics/IORuntimeMetricsCompanionPlatform.scala new file mode 100644 index 0000000000..417008e277 --- /dev/null +++ b/core/jvm/src/main/scala/cats/effect/unsafe/metrics/IORuntimeMetricsCompanionPlatform.scala @@ -0,0 +1,36 @@ +/* + * Copyright 2020-2024 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.unsafe.metrics + +import scala.concurrent.ExecutionContext + +private[metrics] abstract class IORuntimeMetricsCompanionPlatform { + this: IORuntimeMetrics.type => + + private[unsafe] def apply(ec: ExecutionContext): IORuntimeMetrics = + new IORuntimeMetrics { + private[effect] val cpuStarvationSampler: CpuStarvationSampler = + CpuStarvationSampler() + + val cpuStarvation: CpuStarvationMetrics = + CpuStarvationMetrics(cpuStarvationSampler) + + val workStealingThreadPool: Option[WorkStealingPoolMetrics] = + WorkStealingPoolMetrics(ec) + } + +} diff --git a/core/native/src/main/scala/cats/effect/metrics/NativeCpuStarvationMetrics.scala b/core/jvm/src/main/scala/cats/effect/unsafe/metrics/IORuntimeMetricsPlatform.scala similarity index 57% rename from core/native/src/main/scala/cats/effect/metrics/NativeCpuStarvationMetrics.scala rename to core/jvm/src/main/scala/cats/effect/unsafe/metrics/IORuntimeMetricsPlatform.scala index 8263751437..2da5a2d6bf 100644 --- a/core/native/src/main/scala/cats/effect/metrics/NativeCpuStarvationMetrics.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/metrics/IORuntimeMetricsPlatform.scala @@ -14,18 +14,18 @@ * limitations under the License. */ -package cats.effect.metrics +package cats.effect.unsafe.metrics -import cats.effect.IO +private[metrics] trait IORuntimeMetricsPlatform { this: IORuntimeMetrics => -import scala.concurrent.duration.FiniteDuration - -private[effect] class NativeCpuStarvationMetrics extends CpuStarvationMetrics { - override def incCpuStarvationCount: IO[Unit] = IO.unit - - override def recordClockDrift(drift: FiniteDuration): IO[Unit] = IO.unit -} - -private[effect] object NativeCpuStarvationMetrics { - private[effect] def apply(): CpuStarvationMetrics = new NativeCpuStarvationMetrics + /** + * Returns work-stealing thread pool metrics. + * + * @example + * {{{ + * val runtime: IORuntime = ??? + * val totalWorkers = runtime.metrics.workStealingThreadPool.map(_.compute.workerThreadCount()) + * }}} + */ + def workStealingThreadPool: Option[WorkStealingPoolMetrics] } diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/metrics/WorkStealingPoolMetrics.scala b/core/jvm/src/main/scala/cats/effect/unsafe/metrics/WorkStealingPoolMetrics.scala new file mode 100644 index 0000000000..427d3b3708 --- /dev/null +++ b/core/jvm/src/main/scala/cats/effect/unsafe/metrics/WorkStealingPoolMetrics.scala @@ -0,0 +1,250 @@ +/* + * Copyright 2020-2024 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.unsafe +package metrics + +import scala.concurrent.ExecutionContext + +/** + * Represents metrics associated with a work-stealing thread pool. + */ +sealed trait WorkStealingPoolMetrics { + + /** + * The hash code of the instrumented work-stealing thread pool. This hash uniquely identifies + * the specific thread pool. + */ + def hash: String + + /** + * Compute-specific metrics of the work-stealing thread pool. + */ + def compute: WorkStealingPoolMetrics.ComputeMetrics + + /** + * The list of queue-specific metrics of the work-stealing thread pool. + */ + def localQueues: List[WorkStealingPoolMetrics.LocalQueueMetrics] +} + +object WorkStealingPoolMetrics { + + /** + * The compute metrics of the work-stealing thread pool (WSTP). + */ + sealed trait ComputeMetrics { + + /** + * The number of worker thread instances backing the work-stealing thread pool (WSTP). + * + * @note + * this is a fixed value, as the WSTP has a fixed number of worker threads. + */ + def workerThreadCount(): Int + + /** + * The number of active worker thread instances currently executing fibers on the compute + * thread pool. + * + * @note + * the value may differ between invocations + */ + def activeThreadCount(): Int + + /** + * The number of worker thread instances currently searching for fibers to steal from other + * worker threads. + * + * @note + * the value may differ between invocations + */ + def searchingThreadCount(): Int + + /** + * The number of worker thread instances that can run blocking actions on the compute thread + * pool. + * + * @note + * the value may differ between invocations + */ + def blockedWorkerThreadCount(): Int + + /** + * The total number of fibers enqueued on all local queues. + * + * @note + * the value may differ between invocations + */ + def localQueueFiberCount(): Long + + /** + * The number of fibers which are currently asynchronously suspended. + * + * @note + * This counter is not synchronized due to performance reasons and might be reporting + * out-of-date numbers. + * + * @note + * the value may differ between invocations + */ + def suspendedFiberCount(): Long + } + + /** + * The metrics of the local queue. + */ + sealed trait LocalQueueMetrics { + + /** + * The index of the LocalQueue. + */ + def index: Int + + /** + * The current number of enqueued fibers. + * + * @note + * the value may differ between invocations + */ + def fiberCount(): Int + + /** + * The total number of fibers enqueued during the lifetime of the local queue. + * + * @note + * the value may differ between invocations + */ + def totalFiberCount(): Long + + /** + * The total number of fibers spilt over to the external queue. + * + * @note + * the value may differ between invocations + */ + def totalSpilloverCount(): Long + + /** + * The total number of successful steal attempts by other worker threads. + * + * @note + * the value may differ between invocations + */ + def successfulStealAttemptCount(): Long + + /** + * The total number of stolen fibers by other worker threads. + * + * @note + * the value may differ between invocations + */ + def stolenFiberCount(): Long + + /** + * The index that represents the head of the queue. + * + * @note + * the value may differ between invocations + */ + def headIndex(): Int + + /** + * The index that represents the tail of the queue. + * + * @note + * the value may differ between invocations + */ + def tailIndex(): Int + + /** + * The "real" value of the head of the local queue. This value represents the state of the + * head which is valid for the owner worker thread. This is an unsigned 16 bit integer. + * + * @note + * the value may differ between invocations + */ + def realHeadTag(): Int + + /** + * The "steal" tag of the head of the local queue. This value represents the state of the + * head which is valid for any worker thread looking to steal work from this local queue. + * This is an unsigned 16 bit integer. + * + * @note + * the value may differ between invocations + */ + def stealHeadTag(): Int + + /** + * The "tail" tag of the tail of the local queue. This value represents the state of the + * tail which is valid for the owner worker thread, used for enqueuing fibers into the local + * queue, as well as any other worker thread looking to steal work from this local queue, + * used for calculating how many fibers to steal. This is an unsigned 16 bit integer. + * + * @note + * the value may differ between invocations + */ + def tailTag(): Int + } + + private[metrics] def apply(ec: ExecutionContext): Option[WorkStealingPoolMetrics] = + ec match { + case wstp: WorkStealingThreadPool[_] => + val metrics = new WorkStealingPoolMetrics { + val hash: String = + System.identityHashCode(wstp).toHexString + + val compute: ComputeMetrics = + computeMetrics(wstp) + + val localQueues: List[LocalQueueMetrics] = + wstp.localQueues.toList.zipWithIndex.map { + case (queue, idx) => localQueueMetrics(queue, idx) + } + } + + Some(metrics) + + case _ => + None + } + + private def computeMetrics(wstp: WorkStealingThreadPool[_]): ComputeMetrics = + new ComputeMetrics { + def workerThreadCount(): Int = wstp.getWorkerThreadCount() + def activeThreadCount(): Int = wstp.getActiveThreadCount() + def searchingThreadCount(): Int = wstp.getSearchingThreadCount() + def blockedWorkerThreadCount(): Int = wstp.getBlockedWorkerThreadCount() + def localQueueFiberCount(): Long = wstp.getLocalQueueFiberCount() + def suspendedFiberCount(): Long = wstp.getSuspendedFiberCount() + } + + private def localQueueMetrics(queue: LocalQueue, idx: Int): LocalQueueMetrics = + new LocalQueueMetrics { + def index: Int = idx + def fiberCount(): Int = queue.getFiberCount() + def totalFiberCount(): Long = queue.getTotalFiberCount() + def totalSpilloverCount(): Long = queue.getTotalSpilloverCount() + def successfulStealAttemptCount(): Long = queue.getSuccessfulStealAttemptCount() + def stolenFiberCount(): Long = queue.getStolenFiberCount() + def headIndex(): Int = queue.getHeadIndex() + def tailIndex(): Int = queue.getTailIndex() + def realHeadTag(): Int = queue.getRealHeadTag() + def stealHeadTag(): Int = queue.getStealHeadTag() + def tailTag(): Int = queue.getTailTag() + } +} diff --git a/core/native/src/main/scala/cats/effect/IOApp.scala b/core/native/src/main/scala/cats/effect/IOApp.scala index 540c6b0c38..c7bf849d8d 100644 --- a/core/native/src/main/scala/cats/effect/IOApp.scala +++ b/core/native/src/main/scala/cats/effect/IOApp.scala @@ -16,7 +16,7 @@ package cats.effect -import cats.effect.metrics.{CpuStarvationWarningMetrics, NativeCpuStarvationMetrics} +import cats.effect.metrics.CpuStarvationWarningMetrics import cats.syntax.all._ import scala.concurrent.CancellationException @@ -271,7 +271,7 @@ trait IOApp { else Resource.unit[IO] val starvationChecker = CpuStarvationCheck - .run(runtimeConfig, NativeCpuStarvationMetrics(), onCpuStarvationWarn) + .run(runtimeConfig, runtime.metrics.cpuStarvationSampler, onCpuStarvationWarn) .background Spawn[IO] diff --git a/core/shared/src/main/scala/cats/effect/CpuStarvationCheck.scala b/core/shared/src/main/scala/cats/effect/CpuStarvationCheck.scala index 6a9bbe9c6f..600cc752e1 100644 --- a/core/shared/src/main/scala/cats/effect/CpuStarvationCheck.scala +++ b/core/shared/src/main/scala/cats/effect/CpuStarvationCheck.scala @@ -16,9 +16,10 @@ package cats.effect -import cats.effect.metrics.{CpuStarvationMetrics, CpuStarvationWarningMetrics} +import cats.effect.metrics.CpuStarvationWarningMetrics import cats.effect.std.Console import cats.effect.unsafe.IORuntimeConfig +import cats.effect.unsafe.metrics.CpuStarvationSampler import cats.syntax.all._ import scala.concurrent.duration.{Duration, FiniteDuration} @@ -27,7 +28,7 @@ private[effect] object CpuStarvationCheck extends CpuStarvationCheckPlatform { def run( runtimeConfig: IORuntimeConfig, - metrics: CpuStarvationMetrics, + sampler: CpuStarvationSampler, onCpuStarvationWarn: CpuStarvationWarningMetrics => IO[Unit]): IO[Nothing] = { import runtimeConfig._ @@ -37,7 +38,7 @@ private[effect] object CpuStarvationCheck extends CpuStarvationCheckPlatform { IO.sleep(cpuStarvationCheckInterval) >> IO.monotonic.flatMap { now => val delta = now - initial - metrics.recordClockDrift(delta - cpuStarvationCheckInterval) >> + sampler.recordClockDrift(delta - cpuStarvationCheckInterval) >> IO.realTime .flatMap(fd => (onCpuStarvationWarn( @@ -45,7 +46,7 @@ private[effect] object CpuStarvationCheck extends CpuStarvationCheckPlatform { fd, delta - cpuStarvationCheckInterval, cpuStarvationCheckThreshold, - cpuStarvationCheckInterval)) *> metrics.incCpuStarvationCount) + cpuStarvationCheckInterval)) *> sampler.incCpuStarvationCount) .whenA(delta >= threshold)) >> go(now) } diff --git a/core/shared/src/main/scala/cats/effect/unsafe/IORuntime.scala b/core/shared/src/main/scala/cats/effect/unsafe/IORuntime.scala index 0aa6b79cb5..64d1f34c76 100644 --- a/core/shared/src/main/scala/cats/effect/unsafe/IORuntime.scala +++ b/core/shared/src/main/scala/cats/effect/unsafe/IORuntime.scala @@ -18,6 +18,7 @@ package cats.effect package unsafe import cats.effect.Platform.static +import cats.effect.unsafe.metrics.IORuntimeMetrics import scala.concurrent.ExecutionContext @@ -43,7 +44,8 @@ final class IORuntime private[unsafe] ( private[effect] val pollers: List[Any], private[effect] val fiberMonitor: FiberMonitor, val shutdown: () => Unit, - val config: IORuntimeConfig + val config: IORuntimeConfig, + val metrics: IORuntimeMetrics ) { private[effect] val fiberErrorCbs: StripedHashtable = new StripedHashtable() @@ -70,6 +72,7 @@ object IORuntime extends IORuntimeCompanionPlatform { config: IORuntimeConfig): IORuntime = { val fiberMonitor = FiberMonitor(compute) val unregister = registerFiberMonitorMBean(fiberMonitor) + val metrics = IORuntimeMetrics(compute) def unregisterAndShutdown: () => Unit = () => { unregister() shutdown() @@ -84,7 +87,8 @@ object IORuntime extends IORuntimeCompanionPlatform { pollers, fiberMonitor, unregisterAndShutdown, - config) + config, + metrics) allRuntimes.put(runtime, runtime.hashCode()) runtime } @@ -104,14 +108,19 @@ object IORuntime extends IORuntimeCompanionPlatform { scheduler: Scheduler, fiberMonitor: FiberMonitor, shutdown: () => Unit, - config: IORuntimeConfig): IORuntime = - new IORuntime(compute, blocking, scheduler, Nil, fiberMonitor, shutdown, config) + config: IORuntimeConfig): IORuntime = { + val metrics = IORuntimeMetrics(compute) + new IORuntime(compute, blocking, scheduler, Nil, fiberMonitor, shutdown, config, metrics) + } def builder(): IORuntimeBuilder = IORuntimeBuilder() - private[effect] def testRuntime(ec: ExecutionContext, scheduler: Scheduler): IORuntime = - new IORuntime(ec, ec, scheduler, Nil, new NoOpFiberMonitor(), () => (), IORuntimeConfig()) + private[effect] def testRuntime(ec: ExecutionContext, scheduler: Scheduler): IORuntime = { + val config = IORuntimeConfig() + val metrics = IORuntimeMetrics(ec) + new IORuntime(ec, ec, scheduler, Nil, new NoOpFiberMonitor(), () => (), config, metrics) + } @static private[effect] final val allRuntimes: ThreadSafeHashtable[IORuntime] = new ThreadSafeHashtable(4) diff --git a/core/shared/src/main/scala/cats/effect/unsafe/metrics/CpuStarvationMetrics.scala b/core/shared/src/main/scala/cats/effect/unsafe/metrics/CpuStarvationMetrics.scala new file mode 100644 index 0000000000..a4f46c6c62 --- /dev/null +++ b/core/shared/src/main/scala/cats/effect/unsafe/metrics/CpuStarvationMetrics.scala @@ -0,0 +1,62 @@ +/* + * Copyright 2020-2024 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.unsafe.metrics + +import scala.concurrent.duration._ + +sealed trait CpuStarvationMetrics { + + /** + * Returns the current number of times CPU starvation has occurred. + * + * @note + * the value may differ between invocations + */ + def starvationCount(): Long + + /** + * Returns the current (last) observed clock drift. + * + * @note + * the value may differ between invocations + */ + def clockDriftCurrent(): FiniteDuration + + /** + * Returns the maximum clock drift observed since the launch. + * + * @note + * the value may differ between invocations + */ + def clockDriftMax(): FiniteDuration +} + +object CpuStarvationMetrics { + + private[metrics] def apply(sampler: CpuStarvationSampler): CpuStarvationMetrics = + new CpuStarvationMetrics { + def starvationCount(): Long = + sampler.cpuStarvationCount() + + def clockDriftCurrent(): FiniteDuration = + sampler.clockDriftCurrentMs().millis + + def clockDriftMax(): FiniteDuration = + sampler.clockDriftMaxMs().millis + } + +} diff --git a/core/shared/src/main/scala/cats/effect/unsafe/metrics/CpuStarvationSampler.scala b/core/shared/src/main/scala/cats/effect/unsafe/metrics/CpuStarvationSampler.scala new file mode 100644 index 0000000000..574b58a1a0 --- /dev/null +++ b/core/shared/src/main/scala/cats/effect/unsafe/metrics/CpuStarvationSampler.scala @@ -0,0 +1,65 @@ +/* + * Copyright 2020-2024 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.unsafe.metrics + +import cats.effect.IO + +import scala.concurrent.duration.FiniteDuration + +import java.util.concurrent.atomic.AtomicLong + +private[effect] final class CpuStarvationSampler private ( + counter: AtomicLong, + clockDriftCurrent: AtomicLong, + clockDriftMax: AtomicLong +) { + def cpuStarvationCount(): Long = + counter.get() + + def clockDriftCurrentMs(): Long = + clockDriftCurrent.get() + + def clockDriftMaxMs(): Long = + clockDriftMax.get() + + def incCpuStarvationCount: IO[Unit] = + IO.delay { + counter.incrementAndGet() + () + } + + def recordClockDrift(drift: FiniteDuration): IO[Unit] = { + val driftMs = drift.toMillis + + val maxDrift = + if (driftMs > 0) IO.delay { + clockDriftMax.updateAndGet(math.max(_, driftMs)) + () + } + else IO.unit + + IO.delay(clockDriftCurrent.set(driftMs)) >> maxDrift + } + +} + +private[effect] object CpuStarvationSampler { + + private[effect] def apply(): CpuStarvationSampler = + new CpuStarvationSampler(new AtomicLong(0), new AtomicLong(0), new AtomicLong(0)) + +} diff --git a/core/shared/src/main/scala/cats/effect/unsafe/metrics/IORuntimeMetrics.scala b/core/shared/src/main/scala/cats/effect/unsafe/metrics/IORuntimeMetrics.scala new file mode 100644 index 0000000000..bfecdb0e8e --- /dev/null +++ b/core/shared/src/main/scala/cats/effect/unsafe/metrics/IORuntimeMetrics.scala @@ -0,0 +1,38 @@ +/* + * Copyright 2020-2024 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.unsafe.metrics + +/** + * The runtime-specific metrics. + */ +trait IORuntimeMetrics extends IORuntimeMetricsPlatform { + + /** + * Returns starvation-specific metrics. + * + * @example + * {{{ + * val runtime: IORuntime = ??? + * val maxDrift = runtime.metrics.cpuStarvation.clockDriftMax() + * }}} + */ + def cpuStarvation: CpuStarvationMetrics + + private[effect] def cpuStarvationSampler: CpuStarvationSampler +} + +object IORuntimeMetrics extends IORuntimeMetricsCompanionPlatform