diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java index 98572e2ddd0..19277ba0843 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedData.java @@ -66,6 +66,7 @@ import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.serialization.ObjectEncoder; import io.netty.handler.timeout.ReadTimeoutHandler; +import io.netty.util.concurrent.DefaultThreadFactory; import io.netty.util.concurrent.Promise; @SuppressWarnings("deprecation") @@ -299,7 +300,11 @@ public static void clearWorkGroup() { public synchronized static void createWorkGroup() { if(workerGroup == null) - workerGroup = new NioEventLoopGroup(DMLConfig.DEFAULT_NUMBER_OF_FEDERATED_WORKER_THREADS); + // Daemon event loops so a leaked client-side group (e.g. in-JVM coordinator tests, a missed + // clearWorkGroup(), or an in-flight async shutdownGracefully) cannot block JVM exit. This mirrors + // the daemon factory used for the server-side worker in FederatedWorker. + workerGroup = new NioEventLoopGroup(DMLConfig.DEFAULT_NUMBER_OF_FEDERATED_WORKER_THREADS, + new DefaultThreadFactory("fed-client-worker", true)); } private static class DataRequestHandler extends ChannelInboundHandlerAdapter { diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/FederatedMonitoringServer.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/FederatedMonitoringServer.java index 6b3b180a260..d1a482a689a 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/FederatedMonitoringServer.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/FederatedMonitoringServer.java @@ -34,6 +34,7 @@ import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.cors.CorsConfigBuilder; import io.netty.handler.codec.http.cors.CorsHandler; +import io.netty.util.concurrent.DefaultThreadFactory; public class FederatedMonitoringServer { protected static Logger log = Logger.getLogger(FederatedMonitoringServer.class); @@ -51,8 +52,10 @@ public FederatedMonitoringServer(int port, boolean debug) { public void run() { log.info("Setting up Federated Monitoring Backend on port " + _port); - EventLoopGroup bossGroup = new NioEventLoopGroup(); - EventLoopGroup workerGroup = new NioEventLoopGroup(); + // Daemon event loops so a leaked in-JVM (test) monitoring server cannot block JVM exit. This mirrors + // the daemon factory used for the federated worker and client in FederatedWorker and FederatedData. + EventLoopGroup bossGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("fed-monitoring-boss", true)); + EventLoopGroup workerGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("fed-monitoring-pool", true)); try { var corsConfig = CorsConfigBuilder.forAnyOrigin() diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/services/WorkerService.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/services/WorkerService.java index a2ee2843405..f294fbd5c17 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/services/WorkerService.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/services/WorkerService.java @@ -25,6 +25,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.tuple.MutablePair; @@ -117,11 +118,22 @@ private static synchronized void updateCachedWorkers(List workers, private static synchronized void startStatsCollectionProcess(int threadCount, double frequencySeconds) { if (executorService == null) { - executorService = Executors.newScheduledThreadPool(threadCount); + // Daemon threads so this never-shut-down background stats collector cannot block JVM exit + // (e.g. keep a surefire test fork alive after the monitoring tests complete). + executorService = Executors.newScheduledThreadPool(threadCount, daemonThreadFactory()); executorService.scheduleAtFixedRate(syncWorkerStatisticsRunnable(), 0, Math.round(frequencySeconds * 1000), TimeUnit.MILLISECONDS); } } + private static ThreadFactory daemonThreadFactory() { + final ThreadFactory base = Executors.defaultThreadFactory(); + return r -> { + Thread t = base.newThread(r); + t.setDaemon(true); + return t; + }; + } + public static void syncWorkerStatisticsWithDB(StatisticsModel stats, Long id) { // NOTE: This part of the code is not directly connected to requests coming from the frontend