Skip to content
This repository was archived by the owner on Apr 27, 2026. It is now read-only.
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 13 additions & 65 deletions jflyte/src/main/java/org/flyte/jflyte/ExecuteDynamicWorkflow.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.flyte.jflyte.MoreCollectors.mapValues;
import static org.flyte.jflyte.MoreCollectors.toUnmodifiableList;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.util.Collection;
Expand All @@ -37,6 +38,7 @@
import org.flyte.api.v1.DynamicWorkflowTaskRegistrar;
import org.flyte.api.v1.Literal;
import org.flyte.api.v1.Node;
import org.flyte.api.v1.PartialIdentifier;
import org.flyte.api.v1.PartialLaunchPlanIdentifier;
import org.flyte.api.v1.PartialTaskIdentifier;
import org.flyte.api.v1.PartialWorkflowIdentifier;
Expand Down Expand Up @@ -173,10 +175,18 @@ static DynamicJobSpec rewrite(
Map<TaskIdentifier, TaskTemplate> taskTemplates,
Map<WorkflowIdentifier, WorkflowTemplate> workflowTemplates) {

DynamicWorkflowIdentifierRewrite rewrite = new DynamicWorkflowIdentifierRewrite(config);
IdentifierRewrite rewrite =
IdentifierRewrite.builder()
.adminClient(null)
.domain(config.domain())
.project(config.project())
.version(config.version())
.build();

final IdentifierRewrite.Visitor visitor = rewrite.visitor();

List<Node> rewrittenNodes =
spec.nodes().stream().map(rewrite::visitNode).collect(toUnmodifiableList());
spec.nodes().stream().map(visitor::visitNode).collect(toUnmodifiableList());

Map<WorkflowIdentifier, WorkflowTemplate> usedSubWorkflows =
ProjectClosure.collectSubWorkflows(rewrittenNodes, workflowTemplates);
Expand All @@ -188,7 +198,7 @@ static DynamicJobSpec rewrite(
// and workflows

Map<WorkflowIdentifier, WorkflowTemplate> rewrittenUsedSubWorkflows =
mapValues(usedSubWorkflows, rewrite::visitWorkflowTemplate);
mapValues(usedSubWorkflows, visitor::visitWorkflowTemplate);

return spec.toBuilder()
.nodes(rewrittenNodes)
Expand All @@ -205,68 +215,6 @@ static DynamicJobSpec rewrite(
.build();
}

static class DynamicWorkflowIdentifierRewrite extends WorkflowNodeVisitor {
private final ExecutionConfig config;

DynamicWorkflowIdentifierRewrite(ExecutionConfig config) {
this.config = config;
}

@Override
PartialTaskIdentifier visitTaskIdentifier(PartialTaskIdentifier value) {
if (value.project() == null && value.domain() == null && value.version() == null) {
return PartialTaskIdentifier.builder()
.name(value.name())
.project(config.project())
.domain(config.domain())
.version(config.version())
.build();
}

throw new IllegalArgumentException(
"Dynamic workflow tasks don't support remote tasks: " + value);
}

@Override
PartialWorkflowIdentifier visitWorkflowIdentifier(PartialWorkflowIdentifier value) {
if (value.project() == null && value.domain() == null && value.version() == null) {
return PartialWorkflowIdentifier.builder()
.name(value.name())
.project(config.project())
.domain(config.domain())
.version(config.version())
.build();
}

// in these cases all referenced workflows are sub-workflows, and we can't include
// templates for tasks used in them

throw new IllegalArgumentException(
"Dynamic workflow tasks don't support remote workflows: " + value);
}

@Override
PartialLaunchPlanIdentifier visitLaunchPlanIdentifier(PartialLaunchPlanIdentifier value) {
if (value.project() == null && value.domain() == null && value.version() == null) {
return PartialLaunchPlanIdentifier.builder()
.name(value.name())
.project(config.project())
.domain(config.domain())
.version(config.version())
.build();
}

// we don't need to fetch anything, so we can use this reference, because
// for launch plans we don't need to include task and workflow templates into closure
if (value.project() != null && value.domain() != null && value.version() != null) {
return value;
}

throw new IllegalArgumentException(
"Dynamic workflow tasks don't support remote launch plans: " + value);
}
}

private static DynamicWorkflowTask getDynamicWorkflowTask(String name) {
// be careful not to pass extra
Map<String, String> env = getEnv();
Expand Down