From f7b07e85dc227a26d9ba3ac18de7dea1be74648c Mon Sep 17 00:00:00 2001 From: Maksym Ochenashko Date: Fri, 19 Jul 2024 14:25:48 +0300 Subject: [PATCH 1/4] Implement `IORuntimeMetrics` --- build.sbt | 19 +++- .../IORuntimeMetricsCompanionPlatform.scala | 34 ++++++ .../metrics/IORuntimeMetricsPlatform.scala} | 14 +-- .../js/src/main/scala/cats/effect/IOApp.scala | 4 +- .../src/main/scala/cats/effect/IOApp.scala | 6 +- .../cats/effect/metrics/CpuStarvation.scala | 43 ++------ .../effect/metrics/CpuStarvationMbean.scala | 16 +++ .../IORuntimeMetricsCompanionPlatform.scala | 36 +++++++ .../metrics/IORuntimeMetricsPlatform.scala} | 12 +-- .../metrics/JvmCpuStarvationMetrics.scala | 34 ++---- .../metrics/WorkStealingPoolMetrics.scala | 102 ++++++++++++++++++ .../scala/cats/effect/unsafe/LocalQueue.scala | 2 +- .../unsafe/WorkStealingThreadPool.scala | 14 +-- .../src/main/scala/cats/effect/IOApp.scala | 4 +- .../cats/effect/CpuStarvationCheck.scala | 8 +- .../effect/metrics/CpuStarvationMetrics.scala | 45 +++++++- .../effect/metrics/CpuStarvationSampler.scala | 65 +++++++++++ .../effect/metrics/IORuntimeMetrics.scala | 38 +++++++ .../scala/cats/effect/unsafe/IORuntime.scala | 21 ++-- 19 files changed, 400 insertions(+), 117 deletions(-) create mode 100644 core/js-native/src/main/scala/cats/effect/metrics/IORuntimeMetricsCompanionPlatform.scala rename core/{native/src/main/scala/cats/effect/metrics/NativeCpuStarvationMetrics.scala => js-native/src/main/scala/cats/effect/metrics/IORuntimeMetricsPlatform.scala} (59%) create mode 100644 core/jvm/src/main/scala/cats/effect/metrics/IORuntimeMetricsCompanionPlatform.scala rename core/{js/src/main/scala/cats/effect/metrics/JsCpuStarvationMetrics.scala => jvm/src/main/scala/cats/effect/metrics/IORuntimeMetricsPlatform.scala} (60%) create mode 100644 core/jvm/src/main/scala/cats/effect/metrics/WorkStealingPoolMetrics.scala create mode 100644 core/shared/src/main/scala/cats/effect/metrics/CpuStarvationSampler.scala create mode 100644 core/shared/src/main/scala/cats/effect/metrics/IORuntimeMetrics.scala diff --git a/build.sbt b/build.sbt index 21f88cf130..29ebb5742e 100644 --- a/build.sbt +++ b/build.sbt @@ -668,7 +668,10 @@ lazy val core = crossProject(JSPlatform, JVMPlatform, NativePlatform) ProblemFilters.exclude[ReversedMissingMethodProblem]("cats.effect.IOLocal.reset"), ProblemFilters.exclude[ReversedMissingMethodProblem]("cats.effect.IOLocal.lens"), // this filter is particulary terrible, because it can also mask real issues :( - ProblemFilters.exclude[DirectMissingMethodProblem]("cats.effect.IOLocal.lens") + ProblemFilters.exclude[DirectMissingMethodProblem]("cats.effect.IOLocal.lens"), + // internal API change, makes CpuStarvationMetrics available on all platforms + ProblemFilters.exclude[MissingClassProblem]( + "cats.effect.metrics.JvmCpuStarvationMetrics$NoOpCpuStarvationMetrics") ) ++ { if (tlIsScala3.value) { // Scala 3 specific exclusions @@ -835,7 +838,12 @@ lazy val core = crossProject(JSPlatform, JVMPlatform, NativePlatform) ProblemFilters.exclude[Problem]("cats.effect.CallbackStackOps.*"), // introduced by #3695, which ported fiber monitoring to Native // internal API change - ProblemFilters.exclude[MissingClassProblem]("cats.effect.unsafe.ES2021FiberMonitor") + ProblemFilters.exclude[MissingClassProblem]("cats.effect.unsafe.ES2021FiberMonitor"), + // internal API change, makes CpuStarvationMetrics available on all platforms + ProblemFilters.exclude[MissingClassProblem]( + "cats.effect.metrics.JsCpuStarvationMetrics"), + ProblemFilters.exclude[MissingClassProblem]( + "cats.effect.metrics.JsCpuStarvationMetrics$") ) }, mimaBinaryIssueFilters ++= { @@ -870,7 +878,12 @@ lazy val core = crossProject(JSPlatform, JVMPlatform, NativePlatform) ProblemFilters.exclude[MissingClassProblem]( "cats.effect.unsafe.PollingExecutorScheduler$SleepTask"), ProblemFilters.exclude[MissingClassProblem]("cats.effect.unsafe.QueueExecutorScheduler"), - ProblemFilters.exclude[MissingClassProblem]("cats.effect.unsafe.QueueExecutorScheduler$") + ProblemFilters.exclude[MissingClassProblem]("cats.effect.unsafe.QueueExecutorScheduler$"), + // internal API change, makes CpuStarvationMetrics available on all platforms + ProblemFilters.exclude[MissingClassProblem]( + "cats.effect.metrics.NativeCpuStarvationMetrics"), + ProblemFilters.exclude[MissingClassProblem]( + "cats.effect.metrics.NativeCpuStarvationMetrics$") ) ) diff --git a/core/js-native/src/main/scala/cats/effect/metrics/IORuntimeMetricsCompanionPlatform.scala b/core/js-native/src/main/scala/cats/effect/metrics/IORuntimeMetricsCompanionPlatform.scala new file mode 100644 index 0000000000..2c0d73a68f --- /dev/null +++ b/core/js-native/src/main/scala/cats/effect/metrics/IORuntimeMetricsCompanionPlatform.scala @@ -0,0 +1,34 @@ +/* + * Copyright 2020-2024 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.metrics + +import scala.concurrent.ExecutionContext + +private[metrics] abstract class IORuntimeMetricsCompanionPlatform { + this: IORuntimeMetrics.type => + + private[effect] def apply(ec: ExecutionContext): IORuntimeMetrics = { + val _ = ec + new IORuntimeMetrics { + private[effect] val cpuStarvationSampler: CpuStarvationSampler = + CpuStarvationSampler() + + val cpuStarvation: CpuStarvationMetrics = + CpuStarvationMetrics(cpuStarvationSampler) + } + } +} diff --git a/core/native/src/main/scala/cats/effect/metrics/NativeCpuStarvationMetrics.scala b/core/js-native/src/main/scala/cats/effect/metrics/IORuntimeMetricsPlatform.scala similarity index 59% rename from core/native/src/main/scala/cats/effect/metrics/NativeCpuStarvationMetrics.scala rename to core/js-native/src/main/scala/cats/effect/metrics/IORuntimeMetricsPlatform.scala index 8263751437..70203ffcbb 100644 --- a/core/native/src/main/scala/cats/effect/metrics/NativeCpuStarvationMetrics.scala +++ b/core/js-native/src/main/scala/cats/effect/metrics/IORuntimeMetricsPlatform.scala @@ -16,16 +16,4 @@ package cats.effect.metrics -import cats.effect.IO - -import scala.concurrent.duration.FiniteDuration - -private[effect] class NativeCpuStarvationMetrics extends CpuStarvationMetrics { - override def incCpuStarvationCount: IO[Unit] = IO.unit - - override def recordClockDrift(drift: FiniteDuration): IO[Unit] = IO.unit -} - -private[effect] object NativeCpuStarvationMetrics { - private[effect] def apply(): CpuStarvationMetrics = new NativeCpuStarvationMetrics -} +private[metrics] trait IORuntimeMetricsPlatform { this: IORuntimeMetrics => } diff --git a/core/js/src/main/scala/cats/effect/IOApp.scala b/core/js/src/main/scala/cats/effect/IOApp.scala index 332de3ac9c..c95c2189b8 100644 --- a/core/js/src/main/scala/cats/effect/IOApp.scala +++ b/core/js/src/main/scala/cats/effect/IOApp.scala @@ -16,7 +16,7 @@ package cats.effect -import cats.effect.metrics.{CpuStarvationWarningMetrics, JsCpuStarvationMetrics} +import cats.effect.metrics.CpuStarvationWarningMetrics import cats.effect.std.Console import cats.effect.tracing.TracingConstants._ @@ -260,7 +260,7 @@ trait IOApp { val fiber = Spawn[IO] .raceOutcome[ExitCode, Nothing]( CpuStarvationCheck - .run(runtimeConfig, JsCpuStarvationMetrics(), onCpuStarvationWarn) + .run(runtimeConfig, runtime.metrics.cpuStarvationSampler, onCpuStarvationWarn) .background .surround(run(argList)), keepAlive) diff --git a/core/jvm/src/main/scala/cats/effect/IOApp.scala b/core/jvm/src/main/scala/cats/effect/IOApp.scala index 7b4e4bab59..e9c1571712 100644 --- a/core/jvm/src/main/scala/cats/effect/IOApp.scala +++ b/core/jvm/src/main/scala/cats/effect/IOApp.scala @@ -451,10 +451,10 @@ trait IOApp { val queue = this.queue val fiber = - JvmCpuStarvationMetrics() - .flatMap { cpuStarvationMetrics => + JvmCpuStarvationMetrics(runtime.metrics.cpuStarvationSampler) + .flatMap { _ => CpuStarvationCheck - .run(runtimeConfig, cpuStarvationMetrics, onCpuStarvationWarn) + .run(runtimeConfig, runtime.metrics.cpuStarvationSampler, onCpuStarvationWarn) .background } .surround(ioa) diff --git a/core/jvm/src/main/scala/cats/effect/metrics/CpuStarvation.scala b/core/jvm/src/main/scala/cats/effect/metrics/CpuStarvation.scala index 08164fffad..ac70947070 100644 --- a/core/jvm/src/main/scala/cats/effect/metrics/CpuStarvation.scala +++ b/core/jvm/src/main/scala/cats/effect/metrics/CpuStarvation.scala @@ -16,42 +16,15 @@ package cats.effect.metrics -import cats.effect.IO +private final class CpuStarvation(sampler: CpuStarvationSampler) extends CpuStarvationMBean { + def getCpuStarvationCount(): Long = + sampler.cpuStarvationCount() -import scala.concurrent.duration.FiniteDuration - -import java.util.concurrent.atomic.AtomicLong - -private[metrics] class CpuStarvation private ( - counter: AtomicLong, - currentClockDrift: AtomicLong, - maxClockDrift: AtomicLong) - extends CpuStarvationMBean { - - override def getCpuStarvationCount(): Long = counter.get() - - override def getMaxClockDriftMs(): Long = maxClockDrift.get() - - override def getCurrentClockDriftMs(): Long = currentClockDrift.get() - - def incStarvationCount: IO[Unit] = IO.delay(counter.incrementAndGet()).void - - def recordDrift(drift: FiniteDuration): IO[Unit] = { - val driftMs = drift.toMillis - - val maxDrift = - if (driftMs > 0) IO.delay(maxClockDrift.updateAndGet(math.max(_, driftMs))).void - else IO.unit - - IO.delay(currentClockDrift.set(driftMs)) >> maxDrift - } + def getMaxClockDriftMs(): Long = + sampler.clockDriftMaxMs() + def getCurrentClockDriftMs(): Long = + sampler.clockDriftCurrentMs() } -private[metrics] object CpuStarvation { - private[metrics] def apply(): IO[CpuStarvation] = for { - counter <- IO.delay(new AtomicLong(0)) - currentClockDrift <- IO.delay(new AtomicLong(0)) - maxClockDrift <- IO.delay(new AtomicLong(0)) - } yield new CpuStarvation(counter, currentClockDrift, maxClockDrift) -} +private object CpuStarvation diff --git a/core/jvm/src/main/scala/cats/effect/metrics/CpuStarvationMbean.scala b/core/jvm/src/main/scala/cats/effect/metrics/CpuStarvationMbean.scala index ee61c22344..b203ede6b7 100644 --- a/core/jvm/src/main/scala/cats/effect/metrics/CpuStarvationMbean.scala +++ b/core/jvm/src/main/scala/cats/effect/metrics/CpuStarvationMbean.scala @@ -45,3 +45,19 @@ private[metrics] trait CpuStarvationMBean { */ def getCurrentClockDriftMs(): Long } + +private[metrics] object CpuStarvationMBean { + + private[metrics] def apply(metrics: CpuStarvationSampler): CpuStarvationMBean = + new CpuStarvationMBean { + def getCpuStarvationCount(): Long = + metrics.cpuStarvationCount() + + def getMaxClockDriftMs(): Long = + metrics.clockDriftMaxMs() + + def getCurrentClockDriftMs(): Long = + metrics.clockDriftCurrentMs() + } + +} diff --git a/core/jvm/src/main/scala/cats/effect/metrics/IORuntimeMetricsCompanionPlatform.scala b/core/jvm/src/main/scala/cats/effect/metrics/IORuntimeMetricsCompanionPlatform.scala new file mode 100644 index 0000000000..b03deeb3c1 --- /dev/null +++ b/core/jvm/src/main/scala/cats/effect/metrics/IORuntimeMetricsCompanionPlatform.scala @@ -0,0 +1,36 @@ +/* + * Copyright 2020-2024 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.metrics + +import scala.concurrent.ExecutionContext + +private[metrics] abstract class IORuntimeMetricsCompanionPlatform { + this: IORuntimeMetrics.type => + + private[effect] def apply(ec: ExecutionContext): IORuntimeMetrics = + new IORuntimeMetrics { + private[effect] val cpuStarvationSampler: CpuStarvationSampler = + CpuStarvationSampler() + + val cpuStarvation: CpuStarvationMetrics = + CpuStarvationMetrics(cpuStarvationSampler) + + val workStealingThreadPool: Option[WorkStealingPoolMetrics] = + WorkStealingPoolMetrics(ec) + } + +} diff --git a/core/js/src/main/scala/cats/effect/metrics/JsCpuStarvationMetrics.scala b/core/jvm/src/main/scala/cats/effect/metrics/IORuntimeMetricsPlatform.scala similarity index 60% rename from core/js/src/main/scala/cats/effect/metrics/JsCpuStarvationMetrics.scala rename to core/jvm/src/main/scala/cats/effect/metrics/IORuntimeMetricsPlatform.scala index dc227de3ba..3e2fd08314 100644 --- a/core/js/src/main/scala/cats/effect/metrics/JsCpuStarvationMetrics.scala +++ b/core/jvm/src/main/scala/cats/effect/metrics/IORuntimeMetricsPlatform.scala @@ -16,16 +16,8 @@ package cats.effect.metrics -import cats.effect.IO +private[metrics] trait IORuntimeMetricsPlatform { this: IORuntimeMetrics => -import scala.concurrent.duration.FiniteDuration + def workStealingThreadPool: Option[WorkStealingPoolMetrics] -private[effect] class JsCpuStarvationMetrics extends CpuStarvationMetrics { - override def incCpuStarvationCount: IO[Unit] = IO.unit - - override def recordClockDrift(drift: FiniteDuration): IO[Unit] = IO.unit -} - -private[effect] object JsCpuStarvationMetrics { - private[effect] def apply(): CpuStarvationMetrics = new JsCpuStarvationMetrics } diff --git a/core/jvm/src/main/scala/cats/effect/metrics/JvmCpuStarvationMetrics.scala b/core/jvm/src/main/scala/cats/effect/metrics/JvmCpuStarvationMetrics.scala index d52a20aa3f..b6adedf7fd 100644 --- a/core/jvm/src/main/scala/cats/effect/metrics/JvmCpuStarvationMetrics.scala +++ b/core/jvm/src/main/scala/cats/effect/metrics/JvmCpuStarvationMetrics.scala @@ -18,21 +18,13 @@ package cats.effect.metrics import cats.effect.{IO, Resource} import cats.effect.std.Console - -import scala.concurrent.duration.FiniteDuration +import cats.syntax.functor._ import java.io.{PrintWriter, StringWriter} import java.lang.management.ManagementFactory import javax.management.{MBeanServer, ObjectName} -private[effect] class JvmCpuStarvationMetrics private (mbean: CpuStarvation) - extends CpuStarvationMetrics { - override def incCpuStarvationCount: IO[Unit] = mbean.incStarvationCount - - override def recordClockDrift(drift: FiniteDuration): IO[Unit] = mbean.recordDrift(drift) -} - private[effect] object JvmCpuStarvationMetrics { private[this] val mBeanObjectName = new ObjectName("cats.effect.metrics:type=CpuStarvation") @@ -46,28 +38,18 @@ private[effect] object JvmCpuStarvationMetrics { |""".stripMargin } - private[this] class NoOpCpuStarvationMetrics extends CpuStarvationMetrics { - override def incCpuStarvationCount: IO[Unit] = IO.unit - - override def recordClockDrift(drift: FiniteDuration): IO[Unit] = IO.unit - } - - private[effect] def apply(): Resource[IO, CpuStarvationMetrics] = { - val acquire: IO[(MBeanServer, JvmCpuStarvationMetrics)] = for { + private[effect] def apply(metrics: CpuStarvationSampler): Resource[IO, Unit] = { + val acquire: IO[MBeanServer] = for { mBeanServer <- IO.delay(ManagementFactory.getPlatformMBeanServer) - mBean <- CpuStarvation() + mBean <- IO.pure(new CpuStarvation(metrics)) // To allow user-defined program to use the compute pool from the beginning, // here we use `IO.delay` rather than `IO.blocking`. _ <- IO.delay(mBeanServer.registerMBean(mBean, mBeanObjectName)) - } yield (mBeanServer, new JvmCpuStarvationMetrics(mBean)) + } yield mBeanServer Resource - .make(acquire) { - case (mbeanServer, _) => IO.blocking(mbeanServer.unregisterMBean(mBeanObjectName)) - } - .map(_._2) - .handleErrorWith[CpuStarvationMetrics] { th => - Resource.eval(Console[IO].errorln(warning(th))).map(_ => new NoOpCpuStarvationMetrics) - } + .make(acquire)(mbeanServer => IO.blocking(mbeanServer.unregisterMBean(mBeanObjectName))) + .void + .handleErrorWith[Unit](th => Resource.eval(Console[IO].errorln(warning(th)))) } } diff --git a/core/jvm/src/main/scala/cats/effect/metrics/WorkStealingPoolMetrics.scala b/core/jvm/src/main/scala/cats/effect/metrics/WorkStealingPoolMetrics.scala new file mode 100644 index 0000000000..45cf2ab201 --- /dev/null +++ b/core/jvm/src/main/scala/cats/effect/metrics/WorkStealingPoolMetrics.scala @@ -0,0 +1,102 @@ +/* + * Copyright 2020-2024 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.metrics + +import cats.effect.unsafe.{LocalQueue, WorkStealingThreadPool} + +import scala.concurrent.ExecutionContext + +sealed trait WorkStealingPoolMetrics { + def hash: String + + def compute: WorkStealingPoolMetrics.ComputeMetrics + + def localQueues: List[WorkStealingPoolMetrics.LocalQueueMetrics] +} + +object WorkStealingPoolMetrics { + + sealed trait ComputeMetrics { + def workerThreadCount(): Int + def activeThreadCount(): Int + def searchingThreadCount(): Int + def blockedWorkerThreadCount(): Int + def localQueueFiberCount(): Long + def suspendedFiberCount(): Long + } + + sealed trait LocalQueueMetrics { + def index: Int + def fiberCount(): Int + def headIndex(): Int + def tailIndex(): Int + def totalFiberCount(): Long + def totalSpilloverCount(): Long + def successfulStealAttemptCount(): Long + def stolenFiberCount(): Long + def realHeadTag(): Int + def stealHeadTag(): Int + def tailTag(): Int + } + + private[metrics] def apply(ec: ExecutionContext): Option[WorkStealingPoolMetrics] = + ec match { + case wstp: WorkStealingThreadPool[_] => + val metrics = new WorkStealingPoolMetrics { + val hash: String = + System.identityHashCode(wstp).toHexString + + val compute: ComputeMetrics = + computeMetrics(wstp) + + val localQueues: List[LocalQueueMetrics] = + wstp.localQueues.toList.zipWithIndex.map { + case (queue, idx) => localQueueMetrics(queue, idx) + } + } + + Some(metrics) + + case _ => + None + } + + private def computeMetrics(wstp: WorkStealingThreadPool[_]): ComputeMetrics = + new ComputeMetrics { + def workerThreadCount(): Int = wstp.getWorkerThreadCount() + def activeThreadCount(): Int = wstp.getActiveThreadCount() + def searchingThreadCount(): Int = wstp.getSearchingThreadCount() + def blockedWorkerThreadCount(): Int = wstp.getBlockedWorkerThreadCount() + def localQueueFiberCount(): Long = wstp.getLocalQueueFiberCount() + def suspendedFiberCount(): Long = wstp.getSuspendedFiberCount() + } + + private def localQueueMetrics(queue: LocalQueue, idx: Int): LocalQueueMetrics = + new LocalQueueMetrics { + def index: Int = idx + def fiberCount(): Int = queue.getFiberCount() + def headIndex(): Int = queue.getHeadIndex() + def tailIndex(): Int = queue.getTailIndex() + def totalFiberCount(): Long = queue.getTotalFiberCount() + def totalSpilloverCount(): Long = queue.getTotalSpilloverCount() + def successfulStealAttemptCount(): Long = queue.getSuccessfulStealAttemptCount() + def stolenFiberCount(): Long = queue.getStolenFiberCount() + def realHeadTag(): Int = queue.getRealHeadTag() + def stealHeadTag(): Int = queue.getStealHeadTag() + def tailTag(): Int = queue.getTailTag() + } +} diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/LocalQueue.scala b/core/jvm/src/main/scala/cats/effect/unsafe/LocalQueue.scala index 8b0a6b0e91..ecc4a0df49 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/LocalQueue.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/LocalQueue.scala @@ -119,7 +119,7 @@ import java.util.concurrent.ThreadLocalRandom * `Unsafe`. And `Unsafe` is only really needed on JVM 8. JVM 9+ introduce much richer and * better APIs and tools for building high-performance concurrent systems (e.g. `VarHandle`). */ -private final class LocalQueue extends LocalQueuePadding { +private[effect] final class LocalQueue extends LocalQueuePadding { import LocalQueueConstants._ import TracingConstants._ diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala index daa9e0f765..a0e985395e 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala @@ -81,7 +81,7 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef]( * References to worker threads and their local queues. */ private[this] val workerThreads: Array[WorkerThread[P]] = new Array(threadCount) - private[unsafe] val localQueues: Array[LocalQueue] = new Array(threadCount) + private[effect] val localQueues: Array[LocalQueue] = new Array(threadCount) private[unsafe] val sleepers: Array[TimerHeap] = new Array(threadCount) private[unsafe] val parkedSignals: Array[AtomicBoolean] = new Array(threadCount) private[unsafe] val fiberBags: Array[WeakBag[Runnable]] = new Array(threadCount) @@ -778,7 +778,7 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef]( * @return * the number of worker threads backing the compute pool */ - private[unsafe] def getWorkerThreadCount(): Int = + private[effect] def getWorkerThreadCount(): Int = threadCount /** @@ -788,7 +788,7 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef]( * @return * the number of active worker threads */ - private[unsafe] def getActiveThreadCount(): Int = { + private[effect] def getActiveThreadCount(): Int = { val st = state.get() (st & UnparkMask) >>> UnparkShift } @@ -800,7 +800,7 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef]( * @return * the number of worker threads searching for work */ - private[unsafe] def getSearchingThreadCount(): Int = { + private[effect] def getSearchingThreadCount(): Int = { val st = state.get() st & SearchMask } @@ -812,7 +812,7 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef]( * @return * the number of blocked worker threads */ - private[unsafe] def getBlockedWorkerThreadCount(): Int = + private[effect] def getBlockedWorkerThreadCount(): Int = blockedWorkerThreadCounter.get() /** @@ -821,7 +821,7 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef]( * @return * the total number of fibers enqueued on all local queues */ - private[unsafe] def getLocalQueueFiberCount(): Long = + private[effect] def getLocalQueueFiberCount(): Long = localQueues.map(_.size().toLong).sum /** @@ -834,7 +834,7 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef]( * @return * the number of asynchronously suspended fibers */ - private[unsafe] def getSuspendedFiberCount(): Long = + private[effect] def getSuspendedFiberCount(): Long = workerThreads.map(_.getSuspendedFiberCount().toLong).sum } diff --git a/core/native/src/main/scala/cats/effect/IOApp.scala b/core/native/src/main/scala/cats/effect/IOApp.scala index 540c6b0c38..c7bf849d8d 100644 --- a/core/native/src/main/scala/cats/effect/IOApp.scala +++ b/core/native/src/main/scala/cats/effect/IOApp.scala @@ -16,7 +16,7 @@ package cats.effect -import cats.effect.metrics.{CpuStarvationWarningMetrics, NativeCpuStarvationMetrics} +import cats.effect.metrics.CpuStarvationWarningMetrics import cats.syntax.all._ import scala.concurrent.CancellationException @@ -271,7 +271,7 @@ trait IOApp { else Resource.unit[IO] val starvationChecker = CpuStarvationCheck - .run(runtimeConfig, NativeCpuStarvationMetrics(), onCpuStarvationWarn) + .run(runtimeConfig, runtime.metrics.cpuStarvationSampler, onCpuStarvationWarn) .background Spawn[IO] diff --git a/core/shared/src/main/scala/cats/effect/CpuStarvationCheck.scala b/core/shared/src/main/scala/cats/effect/CpuStarvationCheck.scala index 6a9bbe9c6f..161068bef1 100644 --- a/core/shared/src/main/scala/cats/effect/CpuStarvationCheck.scala +++ b/core/shared/src/main/scala/cats/effect/CpuStarvationCheck.scala @@ -16,7 +16,7 @@ package cats.effect -import cats.effect.metrics.{CpuStarvationMetrics, CpuStarvationWarningMetrics} +import cats.effect.metrics.{CpuStarvationSampler, CpuStarvationWarningMetrics} import cats.effect.std.Console import cats.effect.unsafe.IORuntimeConfig import cats.syntax.all._ @@ -27,7 +27,7 @@ private[effect] object CpuStarvationCheck extends CpuStarvationCheckPlatform { def run( runtimeConfig: IORuntimeConfig, - metrics: CpuStarvationMetrics, + sampler: CpuStarvationSampler, onCpuStarvationWarn: CpuStarvationWarningMetrics => IO[Unit]): IO[Nothing] = { import runtimeConfig._ @@ -37,7 +37,7 @@ private[effect] object CpuStarvationCheck extends CpuStarvationCheckPlatform { IO.sleep(cpuStarvationCheckInterval) >> IO.monotonic.flatMap { now => val delta = now - initial - metrics.recordClockDrift(delta - cpuStarvationCheckInterval) >> + sampler.recordClockDrift(delta - cpuStarvationCheckInterval) >> IO.realTime .flatMap(fd => (onCpuStarvationWarn( @@ -45,7 +45,7 @@ private[effect] object CpuStarvationCheck extends CpuStarvationCheckPlatform { fd, delta - cpuStarvationCheckInterval, cpuStarvationCheckThreshold, - cpuStarvationCheckInterval)) *> metrics.incCpuStarvationCount) + cpuStarvationCheckInterval)) *> sampler.incCpuStarvationCount) .whenA(delta >= threshold)) >> go(now) } diff --git a/core/shared/src/main/scala/cats/effect/metrics/CpuStarvationMetrics.scala b/core/shared/src/main/scala/cats/effect/metrics/CpuStarvationMetrics.scala index 920311d7f9..94f32128c4 100644 --- a/core/shared/src/main/scala/cats/effect/metrics/CpuStarvationMetrics.scala +++ b/core/shared/src/main/scala/cats/effect/metrics/CpuStarvationMetrics.scala @@ -16,12 +16,47 @@ package cats.effect.metrics -import cats.effect.IO +import scala.concurrent.duration._ -import scala.concurrent.duration.FiniteDuration +sealed trait CpuStarvationMetrics { -private[effect] trait CpuStarvationMetrics { - def incCpuStarvationCount: IO[Unit] + /** + * Returns the current number of times CPU starvation has occurred. + * + * @note + * the value may differ between invocations + */ + def starvationCount(): Long + + /** + * Returns the current (last) observed clock drift. + * + * @note + * the value may differ between invocations + */ + def clockDriftCurrent(): FiniteDuration + + /** + * Returns the maximum clock drift observed since the launch. + * + * @note + * the value may differ between invocations + */ + def clockDriftMax(): FiniteDuration +} + +object CpuStarvationMetrics { + + private[metrics] def apply(sampler: CpuStarvationSampler): CpuStarvationMetrics = + new CpuStarvationMetrics { + def starvationCount(): Long = + sampler.cpuStarvationCount() + + def clockDriftCurrent(): FiniteDuration = + sampler.clockDriftCurrentMs().millis + + def clockDriftMax(): FiniteDuration = + sampler.clockDriftMaxMs().millis + } - def recordClockDrift(drift: FiniteDuration): IO[Unit] } diff --git a/core/shared/src/main/scala/cats/effect/metrics/CpuStarvationSampler.scala b/core/shared/src/main/scala/cats/effect/metrics/CpuStarvationSampler.scala new file mode 100644 index 0000000000..ffe4758cb0 --- /dev/null +++ b/core/shared/src/main/scala/cats/effect/metrics/CpuStarvationSampler.scala @@ -0,0 +1,65 @@ +/* + * Copyright 2020-2024 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.metrics + +import cats.effect.IO + +import scala.concurrent.duration.FiniteDuration + +import java.util.concurrent.atomic.AtomicLong + +private[effect] final class CpuStarvationSampler private ( + counter: AtomicLong, + clockDriftCurrent: AtomicLong, + clockDriftMax: AtomicLong +) { + def cpuStarvationCount(): Long = + counter.get() + + def clockDriftCurrentMs(): Long = + clockDriftCurrent.get() + + def clockDriftMaxMs(): Long = + clockDriftMax.get() + + def incCpuStarvationCount: IO[Unit] = + IO.delay { + counter.incrementAndGet() + () + } + + def recordClockDrift(drift: FiniteDuration): IO[Unit] = { + val driftMs = drift.toMillis + + val maxDrift = + if (driftMs > 0) IO.delay { + clockDriftMax.updateAndGet(math.max(_, driftMs)) + () + } + else IO.unit + + IO.delay(clockDriftCurrent.set(driftMs)) >> maxDrift + } + +} + +private[effect] object CpuStarvationSampler { + + private[effect] def apply(): CpuStarvationSampler = + new CpuStarvationSampler(new AtomicLong(0), new AtomicLong(0), new AtomicLong(0)) + +} diff --git a/core/shared/src/main/scala/cats/effect/metrics/IORuntimeMetrics.scala b/core/shared/src/main/scala/cats/effect/metrics/IORuntimeMetrics.scala new file mode 100644 index 0000000000..098c19b92d --- /dev/null +++ b/core/shared/src/main/scala/cats/effect/metrics/IORuntimeMetrics.scala @@ -0,0 +1,38 @@ +/* + * Copyright 2020-2024 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.metrics + +/** + * The runtime-specific metrics. + */ +trait IORuntimeMetrics extends IORuntimeMetricsPlatform { + + /** + * Returns starvation-specific metrics. + * + * @example + * {{{ + * val runtime: IORuntime = ??? + * val maxDrift = runtime.metrics.cpuStarvation.clockDriftMax() + * }}} + */ + def cpuStarvation: CpuStarvationMetrics + + private[effect] def cpuStarvationSampler: CpuStarvationSampler +} + +object IORuntimeMetrics extends IORuntimeMetricsCompanionPlatform diff --git a/core/shared/src/main/scala/cats/effect/unsafe/IORuntime.scala b/core/shared/src/main/scala/cats/effect/unsafe/IORuntime.scala index 0aa6b79cb5..3bc9dfc481 100644 --- a/core/shared/src/main/scala/cats/effect/unsafe/IORuntime.scala +++ b/core/shared/src/main/scala/cats/effect/unsafe/IORuntime.scala @@ -18,6 +18,7 @@ package cats.effect package unsafe import cats.effect.Platform.static +import cats.effect.metrics.IORuntimeMetrics import scala.concurrent.ExecutionContext @@ -43,7 +44,8 @@ final class IORuntime private[unsafe] ( private[effect] val pollers: List[Any], private[effect] val fiberMonitor: FiberMonitor, val shutdown: () => Unit, - val config: IORuntimeConfig + val config: IORuntimeConfig, + val metrics: IORuntimeMetrics ) { private[effect] val fiberErrorCbs: StripedHashtable = new StripedHashtable() @@ -70,6 +72,7 @@ object IORuntime extends IORuntimeCompanionPlatform { config: IORuntimeConfig): IORuntime = { val fiberMonitor = FiberMonitor(compute) val unregister = registerFiberMonitorMBean(fiberMonitor) + val metrics = IORuntimeMetrics(compute) def unregisterAndShutdown: () => Unit = () => { unregister() shutdown() @@ -84,7 +87,8 @@ object IORuntime extends IORuntimeCompanionPlatform { pollers, fiberMonitor, unregisterAndShutdown, - config) + config, + metrics) allRuntimes.put(runtime, runtime.hashCode()) runtime } @@ -104,14 +108,19 @@ object IORuntime extends IORuntimeCompanionPlatform { scheduler: Scheduler, fiberMonitor: FiberMonitor, shutdown: () => Unit, - config: IORuntimeConfig): IORuntime = - new IORuntime(compute, blocking, scheduler, Nil, fiberMonitor, shutdown, config) + config: IORuntimeConfig): IORuntime = { + val metrics = IORuntimeMetrics(compute) + new IORuntime(compute, blocking, scheduler, Nil, fiberMonitor, shutdown, config, metrics) + } def builder(): IORuntimeBuilder = IORuntimeBuilder() - private[effect] def testRuntime(ec: ExecutionContext, scheduler: Scheduler): IORuntime = - new IORuntime(ec, ec, scheduler, Nil, new NoOpFiberMonitor(), () => (), IORuntimeConfig()) + private[effect] def testRuntime(ec: ExecutionContext, scheduler: Scheduler): IORuntime = { + val config = IORuntimeConfig() + val metrics = IORuntimeMetrics(ec) + new IORuntime(ec, ec, scheduler, Nil, new NoOpFiberMonitor(), () => (), config, metrics) + } @static private[effect] final val allRuntimes: ThreadSafeHashtable[IORuntime] = new ThreadSafeHashtable(4) From 964ecbf6d1df13d2db272cab5916165f96abe062 Mon Sep 17 00:00:00 2001 From: Maksym Ochenashko Date: Wed, 31 Jul 2024 10:33:55 +0300 Subject: [PATCH 2/4] Move `IORuntimeMetrics` to `cats.effect.unsafe.metrics` package --- build.sbt | 7 ++++++- .../IORuntimeMetricsCompanionPlatform.scala | 5 +++-- .../metrics/IORuntimeMetricsPlatform.scala | 2 +- .../metrics/JvmCpuStarvationMetrics.scala | 1 + .../scala/cats/effect/unsafe/LocalQueue.scala | 2 +- .../unsafe/WorkStealingThreadPool.scala | 14 ++++++------- .../{ => unsafe}/metrics/CpuStarvation.scala | 9 +++++---- .../metrics/CpuStarvationMBean.scala} | 20 ++----------------- .../IORuntimeMetricsCompanionPlatform.scala | 4 ++-- .../metrics/IORuntimeMetricsPlatform.scala | 4 +--- .../metrics/WorkStealingPoolMetrics.scala | 5 ++--- .../cats/effect/CpuStarvationCheck.scala | 3 ++- .../scala/cats/effect/unsafe/IORuntime.scala | 2 +- .../metrics/CpuStarvationMetrics.scala | 2 +- .../metrics/CpuStarvationSampler.scala | 2 +- .../metrics/IORuntimeMetrics.scala | 2 +- 16 files changed, 37 insertions(+), 47 deletions(-) rename core/js-native/src/main/scala/cats/effect/{ => unsafe}/metrics/IORuntimeMetricsCompanionPlatform.scala (90%) rename core/js-native/src/main/scala/cats/effect/{ => unsafe}/metrics/IORuntimeMetricsPlatform.scala (95%) rename core/jvm/src/main/scala/cats/effect/{ => unsafe}/metrics/CpuStarvation.scala (84%) rename core/jvm/src/main/scala/cats/effect/{metrics/CpuStarvationMbean.scala => unsafe/metrics/CpuStarvationMBean.scala} (71%) rename core/jvm/src/main/scala/cats/effect/{ => unsafe}/metrics/IORuntimeMetricsCompanionPlatform.scala (91%) rename core/jvm/src/main/scala/cats/effect/{ => unsafe}/metrics/IORuntimeMetricsPlatform.scala (95%) rename core/jvm/src/main/scala/cats/effect/{ => unsafe}/metrics/WorkStealingPoolMetrics.scala (97%) rename core/shared/src/main/scala/cats/effect/{ => unsafe}/metrics/CpuStarvationMetrics.scala (97%) rename core/shared/src/main/scala/cats/effect/{ => unsafe}/metrics/CpuStarvationSampler.scala (97%) rename core/shared/src/main/scala/cats/effect/{ => unsafe}/metrics/IORuntimeMetrics.scala (96%) diff --git a/build.sbt b/build.sbt index 29ebb5742e..2f6e97e74d 100644 --- a/build.sbt +++ b/build.sbt @@ -671,7 +671,12 @@ lazy val core = crossProject(JSPlatform, JVMPlatform, NativePlatform) ProblemFilters.exclude[DirectMissingMethodProblem]("cats.effect.IOLocal.lens"), // internal API change, makes CpuStarvationMetrics available on all platforms ProblemFilters.exclude[MissingClassProblem]( - "cats.effect.metrics.JvmCpuStarvationMetrics$NoOpCpuStarvationMetrics") + "cats.effect.metrics.JvmCpuStarvationMetrics$NoOpCpuStarvationMetrics"), + ProblemFilters.exclude[MissingClassProblem]("cats.effect.metrics.CpuStarvationMetrics"), + // package-private classes moved to the `cats.effect.unsafe.metrics` package + ProblemFilters.exclude[MissingClassProblem]("cats.effect.metrics.CpuStarvation"), + ProblemFilters.exclude[MissingClassProblem]("cats.effect.metrics.CpuStarvation$"), + ProblemFilters.exclude[MissingClassProblem]("cats.effect.metrics.CpuStarvationMBean") ) ++ { if (tlIsScala3.value) { // Scala 3 specific exclusions diff --git a/core/js-native/src/main/scala/cats/effect/metrics/IORuntimeMetricsCompanionPlatform.scala b/core/js-native/src/main/scala/cats/effect/unsafe/metrics/IORuntimeMetricsCompanionPlatform.scala similarity index 90% rename from core/js-native/src/main/scala/cats/effect/metrics/IORuntimeMetricsCompanionPlatform.scala rename to core/js-native/src/main/scala/cats/effect/unsafe/metrics/IORuntimeMetricsCompanionPlatform.scala index 2c0d73a68f..724ef3c50c 100644 --- a/core/js-native/src/main/scala/cats/effect/metrics/IORuntimeMetricsCompanionPlatform.scala +++ b/core/js-native/src/main/scala/cats/effect/unsafe/metrics/IORuntimeMetricsCompanionPlatform.scala @@ -14,14 +14,14 @@ * limitations under the License. */ -package cats.effect.metrics +package cats.effect.unsafe.metrics import scala.concurrent.ExecutionContext private[metrics] abstract class IORuntimeMetricsCompanionPlatform { this: IORuntimeMetrics.type => - private[effect] def apply(ec: ExecutionContext): IORuntimeMetrics = { + private[unsafe] def apply(ec: ExecutionContext): IORuntimeMetrics = { val _ = ec new IORuntimeMetrics { private[effect] val cpuStarvationSampler: CpuStarvationSampler = @@ -31,4 +31,5 @@ private[metrics] abstract class IORuntimeMetricsCompanionPlatform { CpuStarvationMetrics(cpuStarvationSampler) } } + } diff --git a/core/js-native/src/main/scala/cats/effect/metrics/IORuntimeMetricsPlatform.scala b/core/js-native/src/main/scala/cats/effect/unsafe/metrics/IORuntimeMetricsPlatform.scala similarity index 95% rename from core/js-native/src/main/scala/cats/effect/metrics/IORuntimeMetricsPlatform.scala rename to core/js-native/src/main/scala/cats/effect/unsafe/metrics/IORuntimeMetricsPlatform.scala index 70203ffcbb..91ece3add2 100644 --- a/core/js-native/src/main/scala/cats/effect/metrics/IORuntimeMetricsPlatform.scala +++ b/core/js-native/src/main/scala/cats/effect/unsafe/metrics/IORuntimeMetricsPlatform.scala @@ -14,6 +14,6 @@ * limitations under the License. */ -package cats.effect.metrics +package cats.effect.unsafe.metrics private[metrics] trait IORuntimeMetricsPlatform { this: IORuntimeMetrics => } diff --git a/core/jvm/src/main/scala/cats/effect/metrics/JvmCpuStarvationMetrics.scala b/core/jvm/src/main/scala/cats/effect/metrics/JvmCpuStarvationMetrics.scala index b6adedf7fd..0be6767c75 100644 --- a/core/jvm/src/main/scala/cats/effect/metrics/JvmCpuStarvationMetrics.scala +++ b/core/jvm/src/main/scala/cats/effect/metrics/JvmCpuStarvationMetrics.scala @@ -18,6 +18,7 @@ package cats.effect.metrics import cats.effect.{IO, Resource} import cats.effect.std.Console +import cats.effect.unsafe.metrics.{CpuStarvation, CpuStarvationSampler} import cats.syntax.functor._ import java.io.{PrintWriter, StringWriter} diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/LocalQueue.scala b/core/jvm/src/main/scala/cats/effect/unsafe/LocalQueue.scala index ecc4a0df49..8b0a6b0e91 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/LocalQueue.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/LocalQueue.scala @@ -119,7 +119,7 @@ import java.util.concurrent.ThreadLocalRandom * `Unsafe`. And `Unsafe` is only really needed on JVM 8. JVM 9+ introduce much richer and * better APIs and tools for building high-performance concurrent systems (e.g. `VarHandle`). */ -private[effect] final class LocalQueue extends LocalQueuePadding { +private final class LocalQueue extends LocalQueuePadding { import LocalQueueConstants._ import TracingConstants._ diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala index a0e985395e..daa9e0f765 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala @@ -81,7 +81,7 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef]( * References to worker threads and their local queues. */ private[this] val workerThreads: Array[WorkerThread[P]] = new Array(threadCount) - private[effect] val localQueues: Array[LocalQueue] = new Array(threadCount) + private[unsafe] val localQueues: Array[LocalQueue] = new Array(threadCount) private[unsafe] val sleepers: Array[TimerHeap] = new Array(threadCount) private[unsafe] val parkedSignals: Array[AtomicBoolean] = new Array(threadCount) private[unsafe] val fiberBags: Array[WeakBag[Runnable]] = new Array(threadCount) @@ -778,7 +778,7 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef]( * @return * the number of worker threads backing the compute pool */ - private[effect] def getWorkerThreadCount(): Int = + private[unsafe] def getWorkerThreadCount(): Int = threadCount /** @@ -788,7 +788,7 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef]( * @return * the number of active worker threads */ - private[effect] def getActiveThreadCount(): Int = { + private[unsafe] def getActiveThreadCount(): Int = { val st = state.get() (st & UnparkMask) >>> UnparkShift } @@ -800,7 +800,7 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef]( * @return * the number of worker threads searching for work */ - private[effect] def getSearchingThreadCount(): Int = { + private[unsafe] def getSearchingThreadCount(): Int = { val st = state.get() st & SearchMask } @@ -812,7 +812,7 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef]( * @return * the number of blocked worker threads */ - private[effect] def getBlockedWorkerThreadCount(): Int = + private[unsafe] def getBlockedWorkerThreadCount(): Int = blockedWorkerThreadCounter.get() /** @@ -821,7 +821,7 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef]( * @return * the total number of fibers enqueued on all local queues */ - private[effect] def getLocalQueueFiberCount(): Long = + private[unsafe] def getLocalQueueFiberCount(): Long = localQueues.map(_.size().toLong).sum /** @@ -834,7 +834,7 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef]( * @return * the number of asynchronously suspended fibers */ - private[effect] def getSuspendedFiberCount(): Long = + private[unsafe] def getSuspendedFiberCount(): Long = workerThreads.map(_.getSuspendedFiberCount().toLong).sum } diff --git a/core/jvm/src/main/scala/cats/effect/metrics/CpuStarvation.scala b/core/jvm/src/main/scala/cats/effect/unsafe/metrics/CpuStarvation.scala similarity index 84% rename from core/jvm/src/main/scala/cats/effect/metrics/CpuStarvation.scala rename to core/jvm/src/main/scala/cats/effect/unsafe/metrics/CpuStarvation.scala index ac70947070..c85d69f845 100644 --- a/core/jvm/src/main/scala/cats/effect/metrics/CpuStarvation.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/metrics/CpuStarvation.scala @@ -14,9 +14,12 @@ * limitations under the License. */ -package cats.effect.metrics +package cats.effect.unsafe.metrics + +private[effect] final class CpuStarvation( + sampler: CpuStarvationSampler +) extends CpuStarvationMBean { -private final class CpuStarvation(sampler: CpuStarvationSampler) extends CpuStarvationMBean { def getCpuStarvationCount(): Long = sampler.cpuStarvationCount() @@ -26,5 +29,3 @@ private final class CpuStarvation(sampler: CpuStarvationSampler) extends CpuStar def getCurrentClockDriftMs(): Long = sampler.clockDriftCurrentMs() } - -private object CpuStarvation diff --git a/core/jvm/src/main/scala/cats/effect/metrics/CpuStarvationMbean.scala b/core/jvm/src/main/scala/cats/effect/unsafe/metrics/CpuStarvationMBean.scala similarity index 71% rename from core/jvm/src/main/scala/cats/effect/metrics/CpuStarvationMbean.scala rename to core/jvm/src/main/scala/cats/effect/unsafe/metrics/CpuStarvationMBean.scala index b203ede6b7..6066e2470e 100644 --- a/core/jvm/src/main/scala/cats/effect/metrics/CpuStarvationMbean.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/metrics/CpuStarvationMBean.scala @@ -14,12 +14,12 @@ * limitations under the License. */ -package cats.effect.metrics +package cats.effect.unsafe.metrics /** * An MBean interfaces for monitoring when CPU starvation occurs. */ -private[metrics] trait CpuStarvationMBean { +private[unsafe] trait CpuStarvationMBean { /** * Returns the number of times CPU starvation has occurred. @@ -45,19 +45,3 @@ private[metrics] trait CpuStarvationMBean { */ def getCurrentClockDriftMs(): Long } - -private[metrics] object CpuStarvationMBean { - - private[metrics] def apply(metrics: CpuStarvationSampler): CpuStarvationMBean = - new CpuStarvationMBean { - def getCpuStarvationCount(): Long = - metrics.cpuStarvationCount() - - def getMaxClockDriftMs(): Long = - metrics.clockDriftMaxMs() - - def getCurrentClockDriftMs(): Long = - metrics.clockDriftCurrentMs() - } - -} diff --git a/core/jvm/src/main/scala/cats/effect/metrics/IORuntimeMetricsCompanionPlatform.scala b/core/jvm/src/main/scala/cats/effect/unsafe/metrics/IORuntimeMetricsCompanionPlatform.scala similarity index 91% rename from core/jvm/src/main/scala/cats/effect/metrics/IORuntimeMetricsCompanionPlatform.scala rename to core/jvm/src/main/scala/cats/effect/unsafe/metrics/IORuntimeMetricsCompanionPlatform.scala index b03deeb3c1..417008e277 100644 --- a/core/jvm/src/main/scala/cats/effect/metrics/IORuntimeMetricsCompanionPlatform.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/metrics/IORuntimeMetricsCompanionPlatform.scala @@ -14,14 +14,14 @@ * limitations under the License. */ -package cats.effect.metrics +package cats.effect.unsafe.metrics import scala.concurrent.ExecutionContext private[metrics] abstract class IORuntimeMetricsCompanionPlatform { this: IORuntimeMetrics.type => - private[effect] def apply(ec: ExecutionContext): IORuntimeMetrics = + private[unsafe] def apply(ec: ExecutionContext): IORuntimeMetrics = new IORuntimeMetrics { private[effect] val cpuStarvationSampler: CpuStarvationSampler = CpuStarvationSampler() diff --git a/core/jvm/src/main/scala/cats/effect/metrics/IORuntimeMetricsPlatform.scala b/core/jvm/src/main/scala/cats/effect/unsafe/metrics/IORuntimeMetricsPlatform.scala similarity index 95% rename from core/jvm/src/main/scala/cats/effect/metrics/IORuntimeMetricsPlatform.scala rename to core/jvm/src/main/scala/cats/effect/unsafe/metrics/IORuntimeMetricsPlatform.scala index 3e2fd08314..a596c1c4d1 100644 --- a/core/jvm/src/main/scala/cats/effect/metrics/IORuntimeMetricsPlatform.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/metrics/IORuntimeMetricsPlatform.scala @@ -14,10 +14,8 @@ * limitations under the License. */ -package cats.effect.metrics +package cats.effect.unsafe.metrics private[metrics] trait IORuntimeMetricsPlatform { this: IORuntimeMetrics => - def workStealingThreadPool: Option[WorkStealingPoolMetrics] - } diff --git a/core/jvm/src/main/scala/cats/effect/metrics/WorkStealingPoolMetrics.scala b/core/jvm/src/main/scala/cats/effect/unsafe/metrics/WorkStealingPoolMetrics.scala similarity index 97% rename from core/jvm/src/main/scala/cats/effect/metrics/WorkStealingPoolMetrics.scala rename to core/jvm/src/main/scala/cats/effect/unsafe/metrics/WorkStealingPoolMetrics.scala index 45cf2ab201..7de266d8b0 100644 --- a/core/jvm/src/main/scala/cats/effect/metrics/WorkStealingPoolMetrics.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/metrics/WorkStealingPoolMetrics.scala @@ -14,9 +14,8 @@ * limitations under the License. */ -package cats.effect.metrics - -import cats.effect.unsafe.{LocalQueue, WorkStealingThreadPool} +package cats.effect.unsafe +package metrics import scala.concurrent.ExecutionContext diff --git a/core/shared/src/main/scala/cats/effect/CpuStarvationCheck.scala b/core/shared/src/main/scala/cats/effect/CpuStarvationCheck.scala index 161068bef1..600cc752e1 100644 --- a/core/shared/src/main/scala/cats/effect/CpuStarvationCheck.scala +++ b/core/shared/src/main/scala/cats/effect/CpuStarvationCheck.scala @@ -16,9 +16,10 @@ package cats.effect -import cats.effect.metrics.{CpuStarvationSampler, CpuStarvationWarningMetrics} +import cats.effect.metrics.CpuStarvationWarningMetrics import cats.effect.std.Console import cats.effect.unsafe.IORuntimeConfig +import cats.effect.unsafe.metrics.CpuStarvationSampler import cats.syntax.all._ import scala.concurrent.duration.{Duration, FiniteDuration} diff --git a/core/shared/src/main/scala/cats/effect/unsafe/IORuntime.scala b/core/shared/src/main/scala/cats/effect/unsafe/IORuntime.scala index 3bc9dfc481..64d1f34c76 100644 --- a/core/shared/src/main/scala/cats/effect/unsafe/IORuntime.scala +++ b/core/shared/src/main/scala/cats/effect/unsafe/IORuntime.scala @@ -18,7 +18,7 @@ package cats.effect package unsafe import cats.effect.Platform.static -import cats.effect.metrics.IORuntimeMetrics +import cats.effect.unsafe.metrics.IORuntimeMetrics import scala.concurrent.ExecutionContext diff --git a/core/shared/src/main/scala/cats/effect/metrics/CpuStarvationMetrics.scala b/core/shared/src/main/scala/cats/effect/unsafe/metrics/CpuStarvationMetrics.scala similarity index 97% rename from core/shared/src/main/scala/cats/effect/metrics/CpuStarvationMetrics.scala rename to core/shared/src/main/scala/cats/effect/unsafe/metrics/CpuStarvationMetrics.scala index 94f32128c4..a4f46c6c62 100644 --- a/core/shared/src/main/scala/cats/effect/metrics/CpuStarvationMetrics.scala +++ b/core/shared/src/main/scala/cats/effect/unsafe/metrics/CpuStarvationMetrics.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package cats.effect.metrics +package cats.effect.unsafe.metrics import scala.concurrent.duration._ diff --git a/core/shared/src/main/scala/cats/effect/metrics/CpuStarvationSampler.scala b/core/shared/src/main/scala/cats/effect/unsafe/metrics/CpuStarvationSampler.scala similarity index 97% rename from core/shared/src/main/scala/cats/effect/metrics/CpuStarvationSampler.scala rename to core/shared/src/main/scala/cats/effect/unsafe/metrics/CpuStarvationSampler.scala index ffe4758cb0..574b58a1a0 100644 --- a/core/shared/src/main/scala/cats/effect/metrics/CpuStarvationSampler.scala +++ b/core/shared/src/main/scala/cats/effect/unsafe/metrics/CpuStarvationSampler.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package cats.effect.metrics +package cats.effect.unsafe.metrics import cats.effect.IO diff --git a/core/shared/src/main/scala/cats/effect/metrics/IORuntimeMetrics.scala b/core/shared/src/main/scala/cats/effect/unsafe/metrics/IORuntimeMetrics.scala similarity index 96% rename from core/shared/src/main/scala/cats/effect/metrics/IORuntimeMetrics.scala rename to core/shared/src/main/scala/cats/effect/unsafe/metrics/IORuntimeMetrics.scala index 098c19b92d..bfecdb0e8e 100644 --- a/core/shared/src/main/scala/cats/effect/metrics/IORuntimeMetrics.scala +++ b/core/shared/src/main/scala/cats/effect/unsafe/metrics/IORuntimeMetrics.scala @@ -14,7 +14,7 @@ * limitations under the License. */ -package cats.effect.metrics +package cats.effect.unsafe.metrics /** * The runtime-specific metrics. From 3260e37a8e084f245b62fa9ec115838747553e36 Mon Sep 17 00:00:00 2001 From: Maksym Ochenashko Date: Wed, 31 Jul 2024 11:09:36 +0300 Subject: [PATCH 3/4] Add comments --- .../metrics/IORuntimeMetricsPlatform.scala | 10 ++ .../metrics/WorkStealingPoolMetrics.scala | 157 +++++++++++++++++- 2 files changed, 163 insertions(+), 4 deletions(-) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/metrics/IORuntimeMetricsPlatform.scala b/core/jvm/src/main/scala/cats/effect/unsafe/metrics/IORuntimeMetricsPlatform.scala index a596c1c4d1..2da5a2d6bf 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/metrics/IORuntimeMetricsPlatform.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/metrics/IORuntimeMetricsPlatform.scala @@ -17,5 +17,15 @@ package cats.effect.unsafe.metrics private[metrics] trait IORuntimeMetricsPlatform { this: IORuntimeMetrics => + + /** + * Returns work-stealing thread pool metrics. + * + * @example + * {{{ + * val runtime: IORuntime = ??? + * val totalWorkers = runtime.metrics.workStealingThreadPool.map(_.compute.workerThreadCount()) + * }}} + */ def workStealingThreadPool: Option[WorkStealingPoolMetrics] } diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/metrics/WorkStealingPoolMetrics.scala b/core/jvm/src/main/scala/cats/effect/unsafe/metrics/WorkStealingPoolMetrics.scala index 7de266d8b0..427d3b3708 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/metrics/WorkStealingPoolMetrics.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/metrics/WorkStealingPoolMetrics.scala @@ -19,36 +19,185 @@ package metrics import scala.concurrent.ExecutionContext +/** + * Represents metrics associated with a work-stealing thread pool. + */ sealed trait WorkStealingPoolMetrics { + + /** + * The hash code of the instrumented work-stealing thread pool. This hash uniquely identifies + * the specific thread pool. + */ def hash: String + /** + * Compute-specific metrics of the work-stealing thread pool. + */ def compute: WorkStealingPoolMetrics.ComputeMetrics + /** + * The list of queue-specific metrics of the work-stealing thread pool. + */ def localQueues: List[WorkStealingPoolMetrics.LocalQueueMetrics] } object WorkStealingPoolMetrics { + /** + * The compute metrics of the work-stealing thread pool (WSTP). + */ sealed trait ComputeMetrics { + + /** + * The number of worker thread instances backing the work-stealing thread pool (WSTP). + * + * @note + * this is a fixed value, as the WSTP has a fixed number of worker threads. + */ def workerThreadCount(): Int + + /** + * The number of active worker thread instances currently executing fibers on the compute + * thread pool. + * + * @note + * the value may differ between invocations + */ def activeThreadCount(): Int + + /** + * The number of worker thread instances currently searching for fibers to steal from other + * worker threads. + * + * @note + * the value may differ between invocations + */ def searchingThreadCount(): Int + + /** + * The number of worker thread instances that can run blocking actions on the compute thread + * pool. + * + * @note + * the value may differ between invocations + */ def blockedWorkerThreadCount(): Int + + /** + * The total number of fibers enqueued on all local queues. + * + * @note + * the value may differ between invocations + */ def localQueueFiberCount(): Long + + /** + * The number of fibers which are currently asynchronously suspended. + * + * @note + * This counter is not synchronized due to performance reasons and might be reporting + * out-of-date numbers. + * + * @note + * the value may differ between invocations + */ def suspendedFiberCount(): Long } + /** + * The metrics of the local queue. + */ sealed trait LocalQueueMetrics { + + /** + * The index of the LocalQueue. + */ def index: Int + + /** + * The current number of enqueued fibers. + * + * @note + * the value may differ between invocations + */ def fiberCount(): Int - def headIndex(): Int - def tailIndex(): Int + + /** + * The total number of fibers enqueued during the lifetime of the local queue. + * + * @note + * the value may differ between invocations + */ def totalFiberCount(): Long + + /** + * The total number of fibers spilt over to the external queue. + * + * @note + * the value may differ between invocations + */ def totalSpilloverCount(): Long + + /** + * The total number of successful steal attempts by other worker threads. + * + * @note + * the value may differ between invocations + */ def successfulStealAttemptCount(): Long + + /** + * The total number of stolen fibers by other worker threads. + * + * @note + * the value may differ between invocations + */ def stolenFiberCount(): Long + + /** + * The index that represents the head of the queue. + * + * @note + * the value may differ between invocations + */ + def headIndex(): Int + + /** + * The index that represents the tail of the queue. + * + * @note + * the value may differ between invocations + */ + def tailIndex(): Int + + /** + * The "real" value of the head of the local queue. This value represents the state of the + * head which is valid for the owner worker thread. This is an unsigned 16 bit integer. + * + * @note + * the value may differ between invocations + */ def realHeadTag(): Int + + /** + * The "steal" tag of the head of the local queue. This value represents the state of the + * head which is valid for any worker thread looking to steal work from this local queue. + * This is an unsigned 16 bit integer. + * + * @note + * the value may differ between invocations + */ def stealHeadTag(): Int + + /** + * The "tail" tag of the tail of the local queue. This value represents the state of the + * tail which is valid for the owner worker thread, used for enqueuing fibers into the local + * queue, as well as any other worker thread looking to steal work from this local queue, + * used for calculating how many fibers to steal. This is an unsigned 16 bit integer. + * + * @note + * the value may differ between invocations + */ def tailTag(): Int } @@ -88,12 +237,12 @@ object WorkStealingPoolMetrics { new LocalQueueMetrics { def index: Int = idx def fiberCount(): Int = queue.getFiberCount() - def headIndex(): Int = queue.getHeadIndex() - def tailIndex(): Int = queue.getTailIndex() def totalFiberCount(): Long = queue.getTotalFiberCount() def totalSpilloverCount(): Long = queue.getTotalSpilloverCount() def successfulStealAttemptCount(): Long = queue.getSuccessfulStealAttemptCount() def stolenFiberCount(): Long = queue.getStolenFiberCount() + def headIndex(): Int = queue.getHeadIndex() + def tailIndex(): Int = queue.getTailIndex() def realHeadTag(): Int = queue.getRealHeadTag() def stealHeadTag(): Int = queue.getStealHeadTag() def tailTag(): Int = queue.getTailTag() From 9b7cd4c13865d9524ddc9472858e8125357cf6a9 Mon Sep 17 00:00:00 2001 From: Maksym Ochenashko Date: Fri, 22 Nov 2024 09:31:21 +0200 Subject: [PATCH 4/4] Rename CPUStarvation MBean namespace to `cats.effect.unsafe.metrics` --- .../scala/cats/effect/metrics/JvmCpuStarvationMetrics.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/jvm/src/main/scala/cats/effect/metrics/JvmCpuStarvationMetrics.scala b/core/jvm/src/main/scala/cats/effect/metrics/JvmCpuStarvationMetrics.scala index 0be6767c75..7a0768ab63 100644 --- a/core/jvm/src/main/scala/cats/effect/metrics/JvmCpuStarvationMetrics.scala +++ b/core/jvm/src/main/scala/cats/effect/metrics/JvmCpuStarvationMetrics.scala @@ -27,7 +27,8 @@ import java.lang.management.ManagementFactory import javax.management.{MBeanServer, ObjectName} private[effect] object JvmCpuStarvationMetrics { - private[this] val mBeanObjectName = new ObjectName("cats.effect.metrics:type=CpuStarvation") + private[this] val mBeanObjectName = new ObjectName( + "cats.effect.unsafe.metrics:type=CpuStarvation") private[this] def warning(th: Throwable) = { val exceptionWriter = new StringWriter()