From 72a6dcdb8ab1c6191de93360c1fbb2ffb6233e91 Mon Sep 17 00:00:00 2001 From: liumengkai <1623857502@qq.com> Date: Fri, 2 Dec 2022 17:40:37 +0800 Subject: [PATCH] [environment-##1426] delete tmp flink-distribute-cache-xxx dir after job is done --- .../environment/MyLocalStreamEnvironment.java | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/chunjun-core/src/main/java/com/dtstack/chunjun/environment/MyLocalStreamEnvironment.java b/chunjun-core/src/main/java/com/dtstack/chunjun/environment/MyLocalStreamEnvironment.java index f55b541eca..80cc0cef23 100644 --- a/chunjun-core/src/main/java/com/dtstack/chunjun/environment/MyLocalStreamEnvironment.java +++ b/chunjun-core/src/main/java/com/dtstack/chunjun/environment/MyLocalStreamEnvironment.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.Public; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptions; @@ -33,11 +34,14 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; +import java.io.File; +import java.io.IOException; import java.net.URL; import java.util.Collections; import java.util.List; @@ -113,6 +117,22 @@ private static Configuration validateAndGetConfiguration(final Configuration con return effectiveConfiguration; } + private void clearFlinkLocalDsitributeCache(JobGraph jobGraph) throws IOException { + File osTmpDir = new File(System.getProperty("java.io.tmpdir")); + JobID jobID = jobGraph.getJobID(); + File[] flinkTmpDirs = + osTmpDir.listFiles( + (dir, name) -> name.startsWith("flink-distributed-cache-" + jobID)); + + if (flinkTmpDirs != null && flinkTmpDirs.length > 0) { + for (File flinkTmpDir : flinkTmpDirs) { + if (flinkTmpDir.exists() && flinkTmpDir.isDirectory()) { + FileUtils.deleteDirectory(flinkTmpDir); + } + } + } + } + @Override protected Configuration getConfiguration() { return configuration; @@ -175,6 +195,7 @@ public JobExecutionResult execute(String jobName) throws Exception { } finally { transformations.clear(); miniCluster.close(); + clearFlinkLocalDsitributeCache(jobGraph); } } }