From c752a1aaae8ba2c5a5cacbf0094606dd50fbcb40 Mon Sep 17 00:00:00 2001 From: Babis Kiosidis Date: Thu, 17 Feb 2022 11:26:22 +0100 Subject: [PATCH] reuse the identifierrewrite class --- .../flyte/jflyte/ExecuteDynamicWorkflow.java | 78 ++++--------------- 1 file changed, 13 insertions(+), 65 deletions(-) diff --git a/jflyte/src/main/java/org/flyte/jflyte/ExecuteDynamicWorkflow.java b/jflyte/src/main/java/org/flyte/jflyte/ExecuteDynamicWorkflow.java index 4abc9501f..b8f277d40 100644 --- a/jflyte/src/main/java/org/flyte/jflyte/ExecuteDynamicWorkflow.java +++ b/jflyte/src/main/java/org/flyte/jflyte/ExecuteDynamicWorkflow.java @@ -22,6 +22,7 @@ import static org.flyte.jflyte.MoreCollectors.mapValues; import static org.flyte.jflyte.MoreCollectors.toUnmodifiableList; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import java.util.Collection; @@ -37,6 +38,7 @@ import org.flyte.api.v1.DynamicWorkflowTaskRegistrar; import org.flyte.api.v1.Literal; import org.flyte.api.v1.Node; +import org.flyte.api.v1.PartialIdentifier; import org.flyte.api.v1.PartialLaunchPlanIdentifier; import org.flyte.api.v1.PartialTaskIdentifier; import org.flyte.api.v1.PartialWorkflowIdentifier; @@ -173,10 +175,18 @@ static DynamicJobSpec rewrite( Map taskTemplates, Map workflowTemplates) { - DynamicWorkflowIdentifierRewrite rewrite = new DynamicWorkflowIdentifierRewrite(config); + IdentifierRewrite rewrite = + IdentifierRewrite.builder() + .adminClient(null) + .domain(config.domain()) + .project(config.project()) + .version(config.version()) + .build(); + + final IdentifierRewrite.Visitor visitor = rewrite.visitor(); List rewrittenNodes = - spec.nodes().stream().map(rewrite::visitNode).collect(toUnmodifiableList()); + spec.nodes().stream().map(visitor::visitNode).collect(toUnmodifiableList()); Map usedSubWorkflows = ProjectClosure.collectSubWorkflows(rewrittenNodes, workflowTemplates); @@ -188,7 +198,7 @@ static DynamicJobSpec rewrite( // and workflows Map rewrittenUsedSubWorkflows = - mapValues(usedSubWorkflows, rewrite::visitWorkflowTemplate); + mapValues(usedSubWorkflows, visitor::visitWorkflowTemplate); return spec.toBuilder() .nodes(rewrittenNodes) @@ -205,68 +215,6 @@ static DynamicJobSpec rewrite( .build(); } - static class DynamicWorkflowIdentifierRewrite extends WorkflowNodeVisitor { - private final ExecutionConfig config; - - DynamicWorkflowIdentifierRewrite(ExecutionConfig config) { - this.config = config; - } - - @Override - PartialTaskIdentifier visitTaskIdentifier(PartialTaskIdentifier value) { - if (value.project() == null && value.domain() == null && value.version() == null) { - return PartialTaskIdentifier.builder() - .name(value.name()) - .project(config.project()) - .domain(config.domain()) - .version(config.version()) - .build(); - } - - throw new IllegalArgumentException( - "Dynamic workflow tasks don't support remote tasks: " + value); - } - - @Override - PartialWorkflowIdentifier visitWorkflowIdentifier(PartialWorkflowIdentifier value) { - if (value.project() == null && value.domain() == null && value.version() == null) { - return PartialWorkflowIdentifier.builder() - .name(value.name()) - .project(config.project()) - .domain(config.domain()) - .version(config.version()) - .build(); - } - - // in these cases all referenced workflows are sub-workflows, and we can't include - // templates for tasks used in them - - throw new IllegalArgumentException( - "Dynamic workflow tasks don't support remote workflows: " + value); - } - - @Override - PartialLaunchPlanIdentifier visitLaunchPlanIdentifier(PartialLaunchPlanIdentifier value) { - if (value.project() == null && value.domain() == null && value.version() == null) { - return PartialLaunchPlanIdentifier.builder() - .name(value.name()) - .project(config.project()) - .domain(config.domain()) - .version(config.version()) - .build(); - } - - // we don't need to fetch anything, so we can use this reference, because - // for launch plans we don't need to include task and workflow templates into closure - if (value.project() != null && value.domain() != null && value.version() != null) { - return value; - } - - throw new IllegalArgumentException( - "Dynamic workflow tasks don't support remote launch plans: " + value); - } - } - private static DynamicWorkflowTask getDynamicWorkflowTask(String name) { // be careful not to pass extra Map env = getEnv();