Skip to content

Commit 62ef63a

Browse files
Add workflow on failure policy support (#237)
* Add workflow on failure policy support Signed-off-by: Andres Gomez Ferrer <[email protected]> * Fix unit test Signed-off-by: Andres Gomez Ferrer <[email protected]> * Add SdkWorkflowMetadata Signed-off-by: Andres Gomez Ferrer <[email protected]> * Configure FAIL_IMMEDIATELY default in the builder Signed-off-by: Andres Gomez Ferrer <[email protected]> --------- Signed-off-by: Andres Gomez Ferrer <[email protected]>
1 parent 0c9ba51 commit 62ef63a

File tree

7 files changed

+122
-12
lines changed

7 files changed

+122
-12
lines changed
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright 2021 Flyte Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing,
11+
* software distributed under the License is distributed on an
12+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
13+
* KIND, either express or implied. See the License for the
14+
* specific language governing permissions and limitations
15+
* under the License.
16+
*/
17+
package org.flyte.api.v1;
18+
19+
import com.google.auto.value.AutoValue;
20+
21+
/** Failure Handling Strategy. */
22+
@AutoValue
23+
public abstract class OnFailurePolicy {
24+
public enum Kind {
25+
FAIL_IMMEDIATELY,
26+
FAIL_AFTER_EXECUTABLE_NODES_COMPLETE
27+
}
28+
29+
public abstract Kind getKind();
30+
31+
public static OnFailurePolicy.Builder builder() {
32+
return new AutoValue_OnFailurePolicy.Builder();
33+
}
34+
35+
@AutoValue.Builder
36+
public abstract static class Builder {
37+
public abstract Builder kind(Kind kind);
38+
39+
public abstract OnFailurePolicy build();
40+
}
41+
}

flytekit-api/src/main/java/org/flyte/api/v1/WorkflowMetadata.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,24 @@
1717
package org.flyte.api.v1;
1818

1919
import com.google.auto.value.AutoValue;
20+
import org.flyte.api.v1.OnFailurePolicy.Kind;
2021

2122
/** Metadata for the entire workflow. */
2223
@AutoValue
23-
public class WorkflowMetadata {
24+
public abstract class WorkflowMetadata {
25+
26+
public abstract OnFailurePolicy onFailure();
2427

2528
public static Builder builder() {
26-
return new AutoValue_WorkflowMetadata.Builder();
29+
return new AutoValue_WorkflowMetadata.Builder()
30+
.onFailure(OnFailurePolicy.builder().kind(Kind.FAIL_IMMEDIATELY).build());
2731
}
2832

2933
@AutoValue.Builder
3034
public abstract static class Builder {
3135

36+
public abstract Builder onFailure(OnFailurePolicy onFailure);
37+
3238
public abstract WorkflowMetadata build();
3339
}
3440
}

flytekit-java/src/main/java/org/flyte/flytekit/SdkWorkflowBuilder.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import java.util.Map;
2727
import java.util.Objects;
2828
import javax.annotation.Nullable;
29+
import org.flyte.api.v1.OnFailurePolicy;
30+
import org.flyte.api.v1.OnFailurePolicy.Kind;
2931
import org.flyte.api.v1.WorkflowTemplate;
3032

3133
/** Builder used during {@link SdkWorkflow#expand(SdkWorkflowBuilder)}. */
@@ -37,6 +39,11 @@ public class SdkWorkflowBuilder {
3739
private final Map<String, String> outputDescriptions;
3840
private final SdkNodeNamePolicy sdkNodeNamePolicy;
3941

42+
private SdkWorkflowMetadata workflowMetadata =
43+
SdkWorkflowMetadata.builder()
44+
.onFailure(OnFailurePolicy.builder().kind(Kind.FAIL_IMMEDIATELY).build())
45+
.build();
46+
4047
/** Creates a new builder. */
4148
public SdkWorkflowBuilder() {
4249
this(new SdkNodeNamePolicy());
@@ -54,6 +61,15 @@ public SdkWorkflowBuilder() {
5461

5562
this.sdkNodeNamePolicy = sdkNodeNamePolicy;
5663
}
64+
65+
public void setWorkflowMetadata(SdkWorkflowMetadata workflowMetadata) {
66+
this.workflowMetadata = workflowMetadata;
67+
}
68+
69+
public SdkWorkflowMetadata getWorkflowMetadata() {
70+
return this.workflowMetadata;
71+
}
72+
5773
/**
5874
* Applies the given transformation and returns a new node with a given node id.
5975
*
@@ -99,7 +115,7 @@ public <OutputT> SdkNode<OutputT> applyWithInputMap(
99115
* @return the new {@link SdkNode}
100116
*/
101117
public <OutputT> SdkNode<OutputT> apply(SdkTransform<Void, OutputT> transformWithoutInputs) {
102-
return apply(/*nodeId=*/ (String) null, transformWithoutInputs);
118+
return apply(/* nodeId= */ (String) null, transformWithoutInputs);
103119
}
104120

105121
/**
@@ -112,7 +128,7 @@ public <OutputT> SdkNode<OutputT> apply(SdkTransform<Void, OutputT> transformWit
112128
*/
113129
public <InputT, OutputT> SdkNode<OutputT> apply(
114130
SdkTransform<InputT, OutputT> transform, InputT inputs) {
115-
return apply(/*nodeId=*/ null, transform, inputs);
131+
return apply(/* nodeId= */ null, transform, inputs);
116132
}
117133

118134
/**
@@ -125,7 +141,7 @@ public <InputT, OutputT> SdkNode<OutputT> apply(
125141
*/
126142
public <OutputT> SdkNode<OutputT> applyWithInputMap(
127143
SdkTransform<?, OutputT> transform, Map<String, SdkBindingData<?>> inputs) {
128-
return applyWithInputMap(/*nodeId=*/ null, transform, inputs);
144+
return applyWithInputMap(/* nodeId= */ null, transform, inputs);
129145
}
130146

131147
protected <InputT, OutputT> SdkNode<OutputT> applyInternal(
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright 2021 Flyte Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing,
11+
* software distributed under the License is distributed on an
12+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
13+
* KIND, either express or implied. See the License for the
14+
* specific language governing permissions and limitations
15+
* under the License.
16+
*/
17+
package org.flyte.flytekit;
18+
19+
import com.google.auto.value.AutoValue;
20+
import org.flyte.api.v1.OnFailurePolicy;
21+
22+
/** Metadata for the entire workflow. */
23+
@AutoValue
24+
public abstract class SdkWorkflowMetadata {
25+
26+
public abstract OnFailurePolicy onFailure();
27+
28+
public static Builder builder() {
29+
return new AutoValue_SdkWorkflowMetadata.Builder()
30+
.onFailure(OnFailurePolicy.builder().kind(OnFailurePolicy.Kind.FAIL_IMMEDIATELY).build());
31+
}
32+
33+
@AutoValue.Builder
34+
public abstract static class Builder {
35+
36+
public abstract Builder onFailure(OnFailurePolicy onFailure);
37+
38+
public abstract SdkWorkflowMetadata build();
39+
}
40+
}

flytekit-java/src/main/java/org/flyte/flytekit/WorkflowTemplateIdl.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,14 @@
3232
class WorkflowTemplateIdl {
3333

3434
static WorkflowTemplate ofBuilder(SdkWorkflowBuilder builder) {
35-
WorkflowMetadata metadata = WorkflowMetadata.builder().build();
36-
3735
List<Node> nodes =
3836
builder.getNodes().values().stream().map(SdkNode::toIdl).collect(toUnmodifiableList());
3937

4038
List<Binding> outputs = getOutputBindings(builder);
4139

4240
return WorkflowTemplate.builder()
43-
.metadata(metadata)
41+
.metadata(
42+
WorkflowMetadata.builder().onFailure(builder.getWorkflowMetadata().onFailure()).build())
4443
.interface_(
4544
TypedInterface.builder()
4645
.inputs(getInputVariableMap(builder))

flytekit-scala_2.13/src/main/scala/org/flyte/flytekitscala/SdkScalaWorkflow.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,15 @@
1616
*/
1717
package org.flyte.flytekitscala
1818

19-
import org.flyte.api.v1.WorkflowTemplate
19+
import org.flyte.api.v1.{WorkflowMetadata, WorkflowTemplate}
2020
import org.flyte.flytekit.{
21-
SdkBindingData => SdkJavaBindingData,
2221
SdkNode,
2322
SdkTransform,
2423
SdkType,
2524
SdkWorkflow,
26-
SdkWorkflowBuilder
25+
SdkWorkflowBuilder,
26+
SdkWorkflowMetadata,
27+
SdkBindingData => SdkJavaBindingData
2728
}
2829

2930
import scala.collection.JavaConverters._
@@ -66,6 +67,10 @@ abstract class SdkScalaWorkflow[InputT, OutputT](
6667

6768
class SdkScalaWorkflowBuilder(builder: SdkWorkflowBuilder) {
6869

70+
def setWorkflowMetadata(workflowMetadata: SdkWorkflowMetadata): Unit =
71+
builder.setWorkflowMetadata(workflowMetadata)
72+
def getWorkflowMetadata(): SdkWorkflowMetadata = builder.getWorkflowMetadata
73+
6974
/** Get the nodes applied on the DAG.
7075
* @return
7176
* The workflows node by name.

jflyte/src/main/java/org/flyte/jflyte/ProtoUtil.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import flyteidl.core.Types;
5050
import flyteidl.core.Types.SchemaType.SchemaColumn.SchemaColumnType;
5151
import flyteidl.core.Workflow;
52+
import flyteidl.core.Workflow.WorkflowMetadata.OnFailurePolicy;
5253
import java.io.PrintWriter;
5354
import java.io.StringWriter;
5455
import java.time.Duration;
@@ -741,7 +742,9 @@ static Workflow.WorkflowTemplate serialize(WorkflowTemplate template) {
741742

742743
private static Workflow.WorkflowMetadata serialize(
743744
@SuppressWarnings("UnusedVariable") WorkflowMetadata metadata) {
744-
return Workflow.WorkflowMetadata.newBuilder().build();
745+
return Workflow.WorkflowMetadata.newBuilder()
746+
.setOnFailure(OnFailurePolicy.valueOf(metadata.onFailure().getKind().name()))
747+
.build();
745748
}
746749

747750
@VisibleForTesting

0 commit comments

Comments
 (0)