diff --git a/docker/build.sh b/docker/build.sh
index 02e45675f10..2898effdc22 100755
--- a/docker/build.sh
+++ b/docker/build.sh
@@ -23,7 +23,7 @@
# Build the docker containers
# The first build is for running systemds through docker.
-docker image build -f docker/sysds.Dockerfile -t apache/systemds:latest .
+# docker image build -f docker/sysds.Dockerfile -t apache/systemds:latest .
# The second build is for testing systemds. This image installs the R dependencies needed to run the tests.
docker image build -f docker/testsysds.Dockerfile -t apache/systemds:testing-latest .
diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh
index d79c61b8928..190243254ff 100755
--- a/docker/entrypoint.sh
+++ b/docker/entrypoint.sh
@@ -32,6 +32,10 @@ log="/tmp/sysdstest.log"
mvn -ntp -B test-compile 2>&1 | grep -E "BUILD|Total time:|---|Building SystemDS"
mvn -ntp -B test -D maven.test.skip=false -D automatedtestbase.outputbuffering=true -D test=$1 2>&1 | grep -v "already exists in destination." | tee $log
+# Merge Federated test runs.
+[ -f target/jacoco.exec ] && mv target/jacoco.exec target/jacoco_main.exec
+mvn -ntp -B jacoco:merge
+
grep_args="SUCCESS"
grepvals="$( tail -n 100 $log | grep $grep_args)"
diff --git a/docker/sysds.Dockerfile b/docker/sysds.Dockerfile
index 359bb14bb9e..cc6ef605d53 100644
--- a/docker/sysds.Dockerfile
+++ b/docker/sysds.Dockerfile
@@ -19,7 +19,7 @@
#
#-------------------------------------------------------------
-FROM ubuntu:24.04
+FROM ubuntu:24.04@sha256:6015f66923d7afbc53558d7ccffd325d43b4e249f41a6e93eef074c9505d2233
WORKDIR /usr/src/
diff --git a/docker/testsysds.Dockerfile b/docker/testsysds.Dockerfile
index 3b4781c16b4..2f63dace7fa 100644
--- a/docker/testsysds.Dockerfile
+++ b/docker/testsysds.Dockerfile
@@ -19,8 +19,7 @@
#
#-------------------------------------------------------------
-FROM ubuntu:24.04
-
+FROM ubuntu:24.04@sha256:6015f66923d7afbc53558d7ccffd325d43b4e249f41a6e93eef074c9505d2233
WORKDIR /usr/src/
ENV MAVEN_VERSION=3.9.9
@@ -34,9 +33,6 @@ ENV LC_ALL=en_US.UTF-8
ENV LANG=en_US.UTF-8
ENV LD_LIBRARY_PATH=/usr/local/lib/
-COPY ./src/test/scripts/installDependencies.R installDependencies.R
-COPY ./docker/entrypoint.sh /entrypoint.sh
-
RUN apt-get update -qq \
&& apt-get upgrade -y \
&& apt-get install -y --no-install-recommends \
@@ -74,7 +70,9 @@ RUN apt-get install -y --no-install-recommends \
r-base-dev \
r-base-core
+
# Install R packages
+COPY ./src/test/scripts/installDependencies.R installDependencies.R
RUN Rscript installDependencies.R \
&& rm -rf installDependencies.R \
&& rm -rf /var/lib/apt/lists/*
@@ -86,4 +84,9 @@ RUN wget -qO- https://github.com/microsoft/SEAL/archive/refs/tags/v3.7.0.tar.gz
&& cmake --build build \
&& cmake --install build
+# Finally copy the entrypoint script
+# This is last to enable quick updates to the script after initial local build.
+COPY ./docker/entrypoint.sh /entrypoint.sh
+
+
ENTRYPOINT ["/entrypoint.sh"]
diff --git a/pom.xml b/pom.xml
index 8723b8e3f54..b25d94cc7db 100644
--- a/pom.xml
+++ b/pom.xml
@@ -459,7 +459,19 @@
${project.build.directory}
- */jacoco.exec
+ *.exec
+
+
+
+ ${project.build.directory}
+
+ transient_jacoco**/*.exec
+
+
+
+ ${project.build.directory}
+
+ federated_jacoco/*.exec
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java
index b055a6848fc..55f2f17cd8a 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java
@@ -28,13 +28,6 @@
import javax.net.ssl.SSLException;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.ChannelOutboundHandlerAdapter;
-import io.netty.channel.ChannelPipeline;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.log4j.Logger;
import org.apache.sysds.api.DMLScript;
@@ -50,25 +43,32 @@
import org.apache.sysds.runtime.lineage.LineageCache;
import org.apache.sysds.runtime.lineage.LineageCacheConfig;
import org.apache.sysds.runtime.lineage.LineageCacheConfig.ReuseCacheType;
+import org.apache.sysds.runtime.lineage.LineageItem;
import org.apache.sysds.utils.stats.InfrastructureAnalyzer;
import org.apache.sysds.utils.stats.Timing;
-import org.apache.sysds.runtime.lineage.LineageItem;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.serialization.ClassResolvers;
+import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;
-import io.netty.handler.codec.serialization.ObjectDecoder;
-import io.netty.handler.codec.serialization.ClassResolvers;
@SuppressWarnings("deprecation")
public class FederatedWorker {
- protected static Logger log = Logger.getLogger(FederatedWorker.class);
+ protected static Logger LOG = Logger.getLogger(FederatedWorker.class);
private final int _port;
private final FederatedLookupTable _flt;
@@ -96,7 +96,7 @@ public FederatedWorker(int port, boolean debug) {
}
private void run() {
- log.info("Setting up Federated Worker on port " + _port);
+ LOG.info("Setting up Federated Worker on port " + _port);
int par_conn = ConfigurationManager.getDMLConfig().getIntValue(DMLConfig.FEDERATED_PAR_CONN);
final int EVENT_LOOP_THREADS = (par_conn > 0) ? par_conn : InfrastructureAnalyzer.getLocalParallelism();
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
@@ -113,22 +113,23 @@ private void run() {
b.option(ChannelOption.SO_BACKLOG, 128);
b.childOption(ChannelOption.SO_KEEPALIVE, true);
- log.info("Starting Federated Worker server at port: " + _port);
+ LOG.info("Starting Federated Worker server at port: " + _port);
ChannelFuture f = b.bind(_port).sync();
- log.info("Started Federated Worker at port: " + _port);
+ LOG.info("Started Federated Worker at port: " + _port);
f.channel().closeFuture().sync();
- }
+ }
catch(Exception e) {
- log.info("Federated worker interrupted");
+ LOG.info("Federated worker interrupted");
if(_debug) {
- log.error(e.getMessage());
+ LOG.error(e.getMessage());
e.printStackTrace();
}
}
finally {
- log.info("Federated Worker Shutting down.");
+ LOG.info("Federated Worker Shutting down.");
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
+
}
}
diff --git a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
index a81dedd0975..2c3dd11c6d0 100644
--- a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
+++ b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
@@ -28,6 +28,8 @@
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
+import java.lang.management.ManagementFactory;
+import java.lang.management.RuntimeMXBean;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.charset.Charset;
@@ -1663,9 +1665,21 @@ protected static Process startLocalFedWorker(int port, String[] addArgs, int sle
"--add-opens=java.base/java.lang=ALL-UNNAMED" ,
"--add-opens=java.base/java.lang.ref=ALL-UNNAMED" ,
"--add-opens=java.base/java.util.concurrent=ALL-UNNAMED" ,
- "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED",
+ "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED",};
+
+ RuntimeMXBean runtimeMxBean = ManagementFactory.getRuntimeMXBean();
+ List jvmArgs = runtimeMxBean.getInputArguments();
+
+ for(String arg : jvmArgs) {
+ // add code coverage report
+ if(arg.contains("org.jacoco.agent"))
+ args = ArrayUtils.addAll(args,
+ new String[] {arg.replace("target/jacoco.exec", String.format("target/federated_jacoco/jacoco-%d.exec", port))});
+ }
+
+ args = ArrayUtils.addAll(args, new String[]{
"-cp", classpath,
- DMLScript.class.getName(), "-w", Integer.toString(port), "-stats"};
+ DMLScript.class.getName(), "-w", Integer.toString(port), "-stats"});
if(addArgs != null)
args = ArrayUtils.addAll(args, addArgs);
diff --git a/src/test/java/org/apache/sysds/test/TestUtils.java b/src/test/java/org/apache/sysds/test/TestUtils.java
index aa869a29e35..195e36d6065 100644
--- a/src/test/java/org/apache/sysds/test/TestUtils.java
+++ b/src/test/java/org/apache/sysds/test/TestUtils.java
@@ -19,6 +19,12 @@
package org.apache.sysds.test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.DataOutputStream;
@@ -26,11 +32,11 @@
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileReader;
-import java.io.RandomAccessFile;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
+import java.io.RandomAccessFile;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.Collections;
@@ -44,12 +50,8 @@
import java.util.Random;
import java.util.Set;
import java.util.StringTokenizer;
+import java.util.concurrent.TimeUnit;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.NotImplementedException;
@@ -3489,13 +3491,28 @@ public static void shutdownThread(Thread t) {
public static void shutdownThread(Process t) {
// kill the worker
if( t != null ) {
- Process d = t.destroyForcibly();
+ sendSigInt(t);// Attempt graceful termination
try {
- d.waitFor();
- }
- catch (InterruptedException e) {
- e.printStackTrace();
- }
+ // Wait up to 1 second for the process to exit
+ if (!t.waitFor(10, TimeUnit.SECONDS)) {
+ // If still alive after 1 second, force kill
+ Process forciblyDestroyed = t.destroyForcibly();
+ forciblyDestroyed.waitFor(); // Wait until it's definitely terminated
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public static void sendSigInt(Process process) {
+ long pid = process.pid();
+ ProcessBuilder pb = new ProcessBuilder("kill", "-SIGINT", Long.toString(pid));
+ try {
+ pb.inheritIO().start().waitFor();
+ }
+ catch(IOException | InterruptedException e) {
+ e.printStackTrace();
}
}