diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java index b5463d44267fe..7cf2d5c7b38ed 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java @@ -56,6 +56,8 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URI; import java.net.URL; import java.net.URLClassLoader; import java.nio.file.Path; @@ -304,7 +306,7 @@ public static SessionContext create( initializeConfiguration(defaultContext, environment, sessionId); final MutableURLClassLoader userClassLoader = FlinkUserCodeClassLoaders.create( - defaultContext.getDependencies().toArray(new URL[0]), + getDependencyURLs(defaultContext), SessionContext.class.getClassLoader(), configuration); final ResourceManager resourceManager = new ResourceManager(configuration, userClassLoader); @@ -318,6 +320,21 @@ public static SessionContext create( new OperationManager(operationExecutorService)); } + private static URL[] getDependencyURLs(DefaultContext defaultContext) { + return defaultContext.getDependencies().stream() + .map(SessionContext::toURL) + .toArray(URL[]::new); + } + + private static URL toURL(URI uri) { + try { + return uri.toURL(); + } catch (MalformedURLException | IllegalArgumentException e) { + throw new SqlGatewayException( + String.format("Failed to convert dependency URI '%s' to URL.", uri), e); + } + } + // ------------------------------------------------------------------------------------------------------------------ // Helpers // ------------------------------------------------------------------------------------------------------------------ diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java index 74ff1a16807b1..3f2dffd4d2a7b 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/context/SessionContextTest.java @@ -32,6 +32,7 @@ import org.apache.flink.table.gateway.api.session.SessionEnvironment; import org.apache.flink.table.gateway.api.session.SessionHandle; import org.apache.flink.table.gateway.api.utils.MockedEndpointVersion; +import org.apache.flink.table.gateway.api.utils.SqlGatewayException; import org.apache.flink.table.gateway.api.utils.ThreadUtils; import org.junit.jupiter.api.AfterAll; @@ -40,6 +41,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import java.net.URI; import java.nio.file.Path; import java.util.Arrays; import java.util.Collections; @@ -54,6 +56,7 @@ import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_SQL_DIALECT; import static org.apache.flink.table.catalog.CommonCatalogOptions.TABLE_CATALOG_STORE_KIND; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Test {@link SessionContext}. */ class SessionContextTest { @@ -146,6 +149,50 @@ void testSetAndResetArbitraryKey() { .matches((conf) -> !conf.containsKey("aa") && !conf.containsKey("bb")); } + @Test + void testCreateContextWithDependencies(@TempDir Path dependencyDirectory) throws Exception { + DefaultContext defaultContext = + new DefaultContext( + new Configuration(), + Collections.singletonList(dependencyDirectory.toUri())); + SessionEnvironment environment = + SessionEnvironment.newBuilder() + .setSessionEndpointVersion(MockedEndpointVersion.V1) + .build(); + + SessionContext context = + SessionContext.create( + defaultContext, SessionHandle.create(), environment, EXECUTOR_SERVICE); + try { + assertThat(context.getUserClassloader().getURLs()) + .contains(dependencyDirectory.toUri().toURL()); + } finally { + context.close(); + } + } + + @Test + void testCreateContextWrapsSchemelessDependencyUri() { + DefaultContext defaultContext = + new DefaultContext( + new Configuration(), Collections.singletonList(URI.create("foo.jar"))); + SessionEnvironment environment = + SessionEnvironment.newBuilder() + .setSessionEndpointVersion(MockedEndpointVersion.V1) + .build(); + + assertThatThrownBy( + () -> + SessionContext.create( + defaultContext, + SessionHandle.create(), + environment, + EXECUTOR_SERVICE)) + .isInstanceOf(SqlGatewayException.class) + .hasMessageContaining("Failed to convert dependency URI 'foo.jar' to URL.") + .hasCauseInstanceOf(IllegalArgumentException.class); + } + @Test void testCreateContextWithListeners() { assertThat(