Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.Promise;

@SuppressWarnings("deprecation")
Expand Down Expand Up @@ -299,7 +300,11 @@ public static void clearWorkGroup() {

public synchronized static void createWorkGroup() {
if(workerGroup == null)
workerGroup = new NioEventLoopGroup(DMLConfig.DEFAULT_NUMBER_OF_FEDERATED_WORKER_THREADS);
// Daemon event loops so a leaked client-side group (e.g. in-JVM coordinator tests, a missed
// clearWorkGroup(), or an in-flight async shutdownGracefully) cannot block JVM exit. This mirrors
// the daemon factory used for the server-side worker in FederatedWorker.
workerGroup = new NioEventLoopGroup(DMLConfig.DEFAULT_NUMBER_OF_FEDERATED_WORKER_THREADS,
new DefaultThreadFactory("fed-client-worker", true));
}

private static class DataRequestHandler extends ChannelInboundHandlerAdapter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.cors.CorsConfigBuilder;
import io.netty.handler.codec.http.cors.CorsHandler;
import io.netty.util.concurrent.DefaultThreadFactory;

public class FederatedMonitoringServer {
protected static Logger log = Logger.getLogger(FederatedMonitoringServer.class);
Expand All @@ -51,8 +52,10 @@ public FederatedMonitoringServer(int port, boolean debug) {

public void run() {
log.info("Setting up Federated Monitoring Backend on port " + _port);
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
// Daemon event loops so a leaked in-JVM (test) monitoring server cannot block JVM exit. This mirrors
// the daemon factory used for the federated worker and client in FederatedWorker and FederatedData.
EventLoopGroup bossGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("fed-monitoring-boss", true));
EventLoopGroup workerGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("fed-monitoring-pool", true));

try {
var corsConfig = CorsConfigBuilder.forAnyOrigin()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang3.tuple.MutablePair;
Expand Down Expand Up @@ -117,11 +118,22 @@ private static synchronized void updateCachedWorkers(List<WorkerModel> workers,

private static synchronized void startStatsCollectionProcess(int threadCount, double frequencySeconds) {
if (executorService == null) {
executorService = Executors.newScheduledThreadPool(threadCount);
// Daemon threads so this never-shut-down background stats collector cannot block JVM exit
// (e.g. keep a surefire test fork alive after the monitoring tests complete).
executorService = Executors.newScheduledThreadPool(threadCount, daemonThreadFactory());
executorService.scheduleAtFixedRate(syncWorkerStatisticsRunnable(), 0, Math.round(frequencySeconds * 1000), TimeUnit.MILLISECONDS);
}
}

private static ThreadFactory daemonThreadFactory() {
final ThreadFactory base = Executors.defaultThreadFactory();
return r -> {
Thread t = base.newThread(r);
t.setDaemon(true);
return t;
};
}

public static void syncWorkerStatisticsWithDB(StatisticsModel stats, Long id) {

// NOTE: This part of the code is not directly connected to requests coming from the frontend
Expand Down
Loading