Skip to content

Commit 2d8a55c

Browse files
authored
feat: support for side input (#65)
Signed-off-by: Yashash H L <[email protected]>
1 parent c9503e1 commit 2d8a55c

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+1181
-216
lines changed

examples/src/main/java/io/numaproj/numaflow/examples/map/forward/ForwardFunction.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
*/
1212

1313
public class ForwardFunction extends Mapper {
14-
1514
public static void main(String[] args) throws Exception {
1615
new Server(new ForwardFunction()).start();
1716
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package io.numaproj.numaflow.examples.sideinput;
2+
3+
import lombok.AllArgsConstructor;
4+
import lombok.Getter;
5+
import lombok.Setter;
6+
import lombok.ToString;
7+
8+
/**
9+
* A simple config class to hold the source and sampling rate.
10+
* This config will be serialized and used as a side input message.
11+
*/
12+
@Getter
13+
@Setter
14+
@AllArgsConstructor
15+
@ToString
16+
public class Config {
17+
private String source;
18+
private float sampling;
19+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
# SideInput Example
2+
3+
An example that demonstrates how to write a `sideinput` function along with a sample `User Defined function`
4+
which watches and uses the corresponding side input.
5+
6+
### SideInput
7+
```java
8+
public class SimpleSideInput extends SideInputRetriever {
9+
private final Config config;
10+
private final ObjectMapper jsonMapper = new ObjectMapper();
11+
12+
public SimpleSideInput(Config config) {
13+
this.config = config;
14+
}
15+
16+
@Override
17+
public Message retrieveSideInput() {
18+
byte[] val;
19+
if (0.9 > config.getDropRatio()) {
20+
config.setDropRatio(0.5F);
21+
} else {
22+
config.setDropRatio(config.getDropRatio() + 0.01F);
23+
}
24+
try {
25+
val = jsonMapper.writeValueAsBytes(config);
26+
return Message.broadcastMessage(val);
27+
} catch (JsonProcessingException e) {
28+
return Message.noBroadcastMessage();
29+
}
30+
}
31+
32+
public static void main(String[] args) throws Exception {
33+
new Server(new SimpleSideInput(new Config("sampling", 0.5F))).start();
34+
}
35+
}
36+
```
37+
After performing the retrieval/update for the side input value the user can choose to either broadcast the
38+
message to other side input vertices or drop the message. The side input message is not retried.
39+
40+
For each side input there will be a file with the given path and after any update to the side input value the file will
41+
be updated.
42+
43+
The directory is fixed and can be accessed through constants `Constants.SIDE_INPUT_DIR`.
44+
The file name is the name of the side input.
45+
```text
46+
Constants.SIDE_INPUT_DIR -> "/var/numaflow/side-inputs"
47+
sideInputFileName -> "/var/numaflow/side-inputs/sideInputName"
48+
```
49+
50+
### User Defined Function
51+
52+
The UDF vertex will watch for changes to this file and whenever there is a change it will read the file to obtain the new side input value.
53+
54+
### Pipeline spec
55+
56+
In the spec we need to define the side input vertex and the UDF vertex. The UDF vertex will have the side input vertex as a side input.
57+
58+
Side input spec:
59+
```yaml
60+
spec:
61+
sideInputs:
62+
- name: myticker
63+
container:
64+
image: "quay.io/numaio/numaflow-java/sideinput:v0.5.0"
65+
imagePullPolicy: Always
66+
trigger:
67+
schedule: "*/2 * * * *"
68+
69+
```
70+
71+
Vertex spec for the UDF vertex:
72+
```yaml
73+
- name: si-log
74+
udf:
75+
container:
76+
image: "quay.io/numaio/numaflow-java/udf-sideinput:v0.5.0"
77+
imagePullPolicy: Always
78+
containerTemplate:
79+
env:
80+
- name: NUMAFLOW_DEBUG
81+
value: "true" # DO NOT forget the double quotes!!!
82+
sideInputs:
83+
- myticker
84+
```
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package io.numaproj.numaflow.examples.sideinput.simple;
2+
3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import io.numaproj.numaflow.examples.sideinput.Config;
6+
import io.numaproj.numaflow.sideinput.Message;
7+
import io.numaproj.numaflow.sideinput.Server;
8+
import io.numaproj.numaflow.sideinput.SideInputRetriever;
9+
import lombok.extern.slf4j.Slf4j;
10+
11+
/**
12+
* This is a simple side input example.
13+
* Example shows how to broadcast a message to other side input vertices.
14+
* This will be invoked for every fixed interval of time(defined in the pipeline config).
15+
* We are using a simple config class to hold the source and sampling rate(side input).
16+
* we are incrementing the sampling rate by 0.01 for every broadcast.
17+
* And if the sampling rate is greater than 0.9, we will reset it to 0.5.
18+
* In case of failure to serialize the config, we will drop the message(we will not broadcast the message).
19+
*/
20+
21+
@Slf4j
22+
public class SimpleSideInput extends SideInputRetriever {
23+
private final Config config;
24+
private final ObjectMapper jsonMapper = new ObjectMapper();
25+
26+
public SimpleSideInput(Config config) {
27+
this.config = config;
28+
}
29+
30+
@Override
31+
public Message retrieveSideInput() {
32+
byte[] val;
33+
if (0.9 > config.getSampling()) {
34+
config.setSampling(0.5F);
35+
} else {
36+
config.setSampling(config.getSampling() + 0.01F);
37+
}
38+
try {
39+
val = jsonMapper.writeValueAsBytes(config);
40+
// broadcastMessage will broadcast the message to other side input vertices
41+
log.info("Broadcasting side input message: {}", new String(val));
42+
return Message.createBroadcastMessage(val);
43+
} catch (JsonProcessingException e) {
44+
// noBroadcastMessage will drop the message
45+
log.error("Failed to serialize config: {}", e.getMessage());
46+
return Message.createNoBroadcastMessage();
47+
}
48+
}
49+
50+
public static void main(String[] args) throws Exception {
51+
new Server(new SimpleSideInput(new Config("sampling", 0.5F))).start();
52+
}
53+
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package io.numaproj.numaflow.examples.sideinput.udf;
2+
3+
import java.io.BufferedReader;
4+
import java.io.FileReader;
5+
import java.io.IOException;
6+
import java.nio.file.FileSystems;
7+
import java.nio.file.Path;
8+
import java.nio.file.Paths;
9+
import java.nio.file.StandardWatchEventKinds;
10+
import java.nio.file.WatchEvent;
11+
import java.nio.file.WatchKey;
12+
import java.nio.file.WatchService;
13+
14+
/**
15+
SideInputWatcher is used to watch for side input file changes.
16+
*/
17+
public class SideInputWatcher {
18+
19+
private final String fileName;
20+
private final String dirPath;
21+
private WatchService watchService;
22+
private Thread watchThread;
23+
private String content = ""; // Shared variable to store the content
24+
25+
public SideInputWatcher(String dirPath, String fileName) {
26+
this.dirPath = dirPath;
27+
this.fileName = fileName;
28+
try {
29+
this.watchService = FileSystems.getDefault().newWatchService();
30+
} catch (IOException e) {
31+
e.printStackTrace();
32+
}
33+
}
34+
35+
public void startWatching() {
36+
try {
37+
Path path = Paths.get(dirPath);
38+
path.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY);
39+
40+
watchThread = new Thread(() -> {
41+
try {
42+
while (true) {
43+
WatchKey key = watchService.take();
44+
for (WatchEvent<?> event : key.pollEvents()) {
45+
if (event.kind() == StandardWatchEventKinds.ENTRY_MODIFY) {
46+
Path modifiedFile = (Path) event.context();
47+
// Check if the modified file is the same as the side input file
48+
if (modifiedFile.toString().equals(fileName)) {
49+
handleFileUpdate();
50+
}
51+
}
52+
}
53+
key.reset();
54+
}
55+
} catch (InterruptedException e) {
56+
e.printStackTrace();
57+
}
58+
});
59+
60+
watchThread.start();
61+
} catch (IOException e) {
62+
e.printStackTrace();
63+
}
64+
}
65+
66+
private void handleFileUpdate() {
67+
Path filePath = Paths.get(dirPath, fileName);
68+
try {
69+
String content = readFileContents(filePath);
70+
synchronized (this) {
71+
this.content = content;
72+
}
73+
} catch (IOException e) {
74+
e.printStackTrace();
75+
}
76+
}
77+
78+
private String readFileContents(Path filePath) throws IOException {
79+
StringBuilder contentBuilder = new StringBuilder();
80+
try (BufferedReader reader = new BufferedReader(new FileReader(filePath.toFile()))) {
81+
String line;
82+
while ((line = reader.readLine()) != null) {
83+
contentBuilder.append(line).append("\n");
84+
}
85+
}
86+
return contentBuilder.toString();
87+
}
88+
89+
public synchronized String getSideInput() {
90+
return content;
91+
}
92+
93+
public void stopWatching() {
94+
if (watchThread != null) {
95+
watchThread.interrupt();
96+
}
97+
}
98+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package io.numaproj.numaflow.examples.sideinput.udf;
2+
3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import io.numaproj.numaflow.examples.sideinput.Config;
6+
import io.numaproj.numaflow.mapper.Datum;
7+
import io.numaproj.numaflow.mapper.Mapper;
8+
import io.numaproj.numaflow.mapper.Message;
9+
import io.numaproj.numaflow.mapper.MessageList;
10+
import io.numaproj.numaflow.mapper.Server;
11+
import io.numaproj.numaflow.sideinput.Constants;
12+
import lombok.extern.slf4j.Slf4j;
13+
14+
/**
15+
* This is a simple User Defined Map example with side input support.
16+
* This example shows how to watch for side input and use it in the map function.
17+
* we are using a simple config class to hold the source and sampling rate(side input).
18+
* We log the config in the map function.
19+
*/
20+
21+
@Slf4j
22+
public class SimpleMapWithSideInput extends Mapper {
23+
SideInputWatcher sideInputWatcher;
24+
ObjectMapper objectMapper = new ObjectMapper();
25+
Config config = new Config("sampling", 0.5F);
26+
27+
public SimpleMapWithSideInput(SideInputWatcher sideInputWatcher) {
28+
this.sideInputWatcher = sideInputWatcher;
29+
}
30+
31+
public static void main(String[] args) throws Exception {
32+
String sideInputName = "sampling-input";
33+
// Get the side input path and file from the environment variables
34+
String dirPath = Constants.SIDE_INPUT_DIR;
35+
36+
// Watch for side input
37+
SideInputWatcher sideInputWatcher = new SideInputWatcher(dirPath, sideInputName);
38+
sideInputWatcher.startWatching();
39+
40+
// start the server
41+
new Server(new SimpleMapWithSideInput(sideInputWatcher)).start();
42+
43+
// Stop watching for side input
44+
sideInputWatcher.stopWatching();
45+
}
46+
47+
public MessageList processMessage(String[] keys, Datum data) {
48+
// Get the side input
49+
String sideInput = sideInputWatcher.getSideInput();
50+
try {
51+
config = objectMapper.readValue(sideInput, Config.class);
52+
} catch (JsonProcessingException e) {
53+
log.error("Failed to deserialize config: {}", e.getMessage());
54+
return MessageList.newBuilder().addMessage(Message.toDrop()).build();
55+
}
56+
57+
log.info("side input - {}", config.toString());
58+
return MessageList
59+
.newBuilder()
60+
.addMessage(new Message(data.getValue(), keys))
61+
.build();
62+
}
63+
}

pom.xml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,7 @@
292292
<exclude>io.numaproj.numaflow.reduce.v1</exclude>
293293
<exclude>io.numaproj.numaflow.sourcetransformer.v1</exclude>
294294
<exclude>io.numaproj.numaflow.sink.v1</exclude>
295+
<exclude>io.numaproj.numaflow.sideinput.v1</exclude>
295296
</excludes>
296297
</configuration>
297298
<executions>
@@ -333,6 +334,7 @@
333334
<exclude>io/numaproj/numaflow/sourcetransformer/v1/*</exclude>
334335
<exclude>io/numaproj/numaflow/sink/v1/*</exclude>
335336
<exclude>io/numaproj/numaflow/shared/*</exclude>
337+
<exclude>io/numaproj/numaflow/sideinput/v1/*</exclude>
336338
</excludes>
337339
</configuration>
338340
</execution>
@@ -357,7 +359,7 @@
357359
<version>3.0.1</version>
358360
<configuration>
359361
<detectJavaApiLink>false</detectJavaApiLink>
360-
<excludePackageNames>io.numaproj.numaflow.info:io.numaproj.numaflow.map.v1:io.numaproj.numaflow.reduce.v1:io.numaproj.numaflow.mapstream.v1:io.numaproj.numaflow.sourcetransformer.v1:io.numaproj.numaflow.sink.v1:io.numaproj.numaflow.reduce.metadata:io.numaproj.numaflow.shared</excludePackageNames>
362+
<excludePackageNames>io.numaproj.numaflow.info:io.numaproj.numaflow.map.v1:io.numaproj.numaflow.reduce.v1:io.numaproj.numaflow.mapstream.v1:io.numaproj.numaflow.sourcetransformer.v1:io.numaproj.numaflow.sink.v1:io.numaproj.numaflow.reduce.metadata:io.numaproj.numaflow.shared:io.numaproj.numaflow.sideinput.v1</excludePackageNames>
361363
</configuration>
362364
<executions>
363365
<execution>

src/main/java/io/numaproj/numaflow/info/ServerInfoAccessor.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package io.numaproj.numaflow.info;
22

3-
import io.numaproj.numaflow.shared.Constants;
4-
53
public interface ServerInfoAccessor {
4+
String DEFAULT_SERVER_INFO_FILE_PATH = "/var/run/numaflow/server-info";
5+
6+
String INFO_EOF = "U+005C__END__";
7+
68
/**
79
* Get current runtime numaflow-java SDK version.
810
*/
@@ -11,7 +13,7 @@ public interface ServerInfoAccessor {
1113
/**
1214
* Delete filePath if it exists.
1315
* Write serverInfo to filePath in Json format.
14-
* Append {@link Constants#INFO_EOF} as a new line to indicate end of file.
16+
* Append {@link ServerInfoAccessor#INFO_EOF} as a new line to indicate end of file.
1517
*
1618
* @param serverInfo server information POJO
1719
* @param filePath file path to write to

0 commit comments

Comments
 (0)