File tree Expand file tree Collapse file tree 11 files changed +91
-31
lines changed
examples/src/main/java/io/numaproj/numaflow/examples
main/java/io/numaproj/numaflow
test/java/io/numaproj/numaflow/sink Expand file tree Collapse file tree 11 files changed +91
-31
lines changed Original file line number Diff line number Diff line change @@ -5,7 +5,7 @@ name: Maven Package
5
5
6
6
on :
7
7
release :
8
- types : [created]
8
+ types : [ created ]
9
9
10
10
jobs :
11
11
build :
@@ -16,19 +16,19 @@ jobs:
16
16
packages : write
17
17
18
18
steps :
19
- - uses : actions/checkout@v3
20
- - name : Set up JDK 11
21
- uses : actions/setup-java@v3
22
- with :
23
- java-version : ' 11'
24
- distribution : ' temurin'
25
- server-id : github # Value of the distributionManagement/repository/id field of the pom.xml
26
- settings-path : ${{ github.workspace }} # location for the settings.xml file
19
+ - uses : actions/checkout@v3
20
+ - name : Set up JDK 11
21
+ uses : actions/setup-java@v3
22
+ with :
23
+ java-version : ' 11'
24
+ distribution : ' temurin'
25
+ server-id : github # Value of the distributionManagement/repository/id field of the pom.xml
26
+ settings-path : ${{ github.workspace }} # location for the settings.xml file
27
27
28
- - name : Build with Maven
29
- run : mvn -B package --file pom.xml
28
+ - name : Build with Maven
29
+ run : mvn -B package --file pom.xml
30
30
31
- - name : Publish to GitHub Packages Apache Maven
32
- run : mvn deploy -s $GITHUB_WORKSPACE/settings.xml
33
- env :
34
- GITHUB_TOKEN : ${{ github.token }}
31
+ - name : Publish to GitHub Packages Apache Maven
32
+ run : mvn deploy -s $GITHUB_WORKSPACE/settings.xml
33
+ env :
34
+ GITHUB_TOKEN : ${{ github.token }}
Original file line number Diff line number Diff line change
1
+ package io .numaproj .numaflow .examples .function .evenodd ;
2
+
3
+ import io .numaproj .numaflow .function .FunctionServer ;
4
+ import io .numaproj .numaflow .function .Message ;
5
+ import io .numaproj .numaflow .function .map .MapFunc ;
6
+ import io .numaproj .numaflow .function .v1 .Udfunction ;
7
+
8
+ import java .io .IOException ;
9
+ import java .util .logging .Logger ;
10
+
11
+ public class Forward {
12
+ private static final Logger logger = Logger .getLogger (Forward .class .getName ());
13
+
14
+ private static Message [] process (String key , Udfunction .Datum data ) {
15
+ int value = Integer .parseInt (new String (data .getValue ().toByteArray ()));
16
+ if (value % 2 == 0 ) {
17
+ return new Message []{Message .to ("even" , data .getValue ().toByteArray ())};
18
+ }
19
+ return new Message []{Message .to ("odd" , data .getValue ().toByteArray ())};
20
+ }
21
+
22
+ public static void main (String [] args ) throws IOException {
23
+ logger .info ("Forward invoked" );
24
+ new FunctionServer ().registerMapper (new MapFunc (Forward ::process )).start ();
25
+ }
26
+ }
Original file line number Diff line number Diff line change
1
+ package io .numaproj .numaflow .examples .function .flatmap ;
2
+
3
+ import io .numaproj .numaflow .function .FunctionServer ;
4
+ import io .numaproj .numaflow .function .Message ;
5
+ import io .numaproj .numaflow .function .map .MapFunc ;
6
+ import io .numaproj .numaflow .function .v1 .Udfunction ;
7
+
8
+ import java .io .IOException ;
9
+ import java .util .logging .Logger ;
10
+
11
+ public class FlatMap {
12
+ private static final Logger logger = Logger .getLogger (FlatMap .class .getName ());
13
+
14
+ private static Message [] process (String key , Udfunction .Datum data ) {
15
+ String msg = new String (data .getValue ().toByteArray ());
16
+ String [] strs = msg .split ("," );
17
+ Message [] results = new Message [strs .length ];
18
+
19
+ for (int i = 0 ; i < strs .length ; i ++) {
20
+ results [i ] = Message .toAll (strs [i ].getBytes ());
21
+ }
22
+ return results ;
23
+ }
24
+
25
+ public static void main (String [] args ) throws IOException {
26
+ logger .info ("Flatmap invoked" );
27
+ new FunctionServer ().registerMapper (new MapFunc (FlatMap ::process )).start ();
28
+ }
29
+ }
Original file line number Diff line number Diff line change 2
2
3
3
import io .numaproj .numaflow .sink .Response ;
4
4
import io .numaproj .numaflow .sink .SinkDatumStream ;
5
- import io .numaproj .numaflow .sink .SinkDatumStreamImpl ;
6
5
import io .numaproj .numaflow .sink .SinkFunc ;
7
6
import io .numaproj .numaflow .sink .SinkServer ;
8
7
import io .numaproj .numaflow .sink .v1 .Udsink ;
@@ -20,7 +19,7 @@ private static List<Response> process(SinkDatumStream datumStream) {
20
19
21
20
while (true ) {
22
21
Udsink .Datum datum = datumStream .ReadMessage ();
23
- // DONE indicates the end of the input
22
+ // EOF indicates the end of the input
24
23
if (datum == SinkDatumStream .EOF ) {
25
24
break ;
26
25
}
Original file line number Diff line number Diff line change 6
6
7
7
<groupId >io.numaproj.numaflow</groupId >
8
8
<artifactId >numaflow-java</artifactId >
9
- <version >0.3.0-SNAPSHOT </version >
9
+ <version >0.3.1 </version >
10
10
<packaging >jar</packaging >
11
11
12
12
<name >numaflow-java</name >
Original file line number Diff line number Diff line change 1
1
<?xml version =" 1.0" encoding =" UTF-8" ?>
2
- <settings xmlns =" http://maven.apache .org/SETTINGS/1.0.0 "
3
- xmlns : xsi =" http://www.w3 .org/2001/XMLSchema-instance "
2
+ <settings xmlns : xsi =" http://www.w3 .org/2001/XMLSchema-instance "
3
+ xmlns =" http://maven.apache .org/SETTINGS/1.0.0 "
4
4
xsi : schemaLocation =" http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd" >
5
5
<activeProfiles >
6
6
<activeProfile >github</activeProfile >
13
13
<repository >
14
14
<id >central</id >
15
15
<url >https://repo1.maven.org/maven2</url >
16
- <releases ><enabled >true</enabled ></releases >
17
- <snapshots ><enabled >false</enabled ></snapshots >
16
+ <releases >
17
+ <enabled >true</enabled >
18
+ </releases >
19
+ <snapshots >
20
+ <enabled >false</enabled >
21
+ </snapshots >
18
22
</repository >
19
23
<repository >
20
24
<id >github</id >
32
36
<password >${env.JAVA_SDK_MAVEN_PASSWORD}</password >
33
37
</server >
34
38
</servers >
35
- </settings >
39
+ </settings >
Original file line number Diff line number Diff line change 7
7
* reduce handlers to read input messages.
8
8
*/
9
9
public interface ReduceDatumStream {
10
+ // EOF indicates the end of input
11
+ Udfunction .Datum EOF = Udfunction .Datum .newBuilder ().setKey ("EOF" ).build ();
12
+
10
13
/* ReadMessage can be used to read message from the stream
11
14
* returns null if there are no more messages to consume.*/
12
15
Udfunction .Datum ReadMessage ();
13
- // EOF indicates the end of input
14
- Udfunction .Datum EOF = Udfunction .Datum .newBuilder ().setKey ("EOF" ).build ();
15
16
}
Original file line number Diff line number Diff line change @@ -23,7 +23,8 @@ public Udfunction.Datum ReadMessage() {
23
23
try {
24
24
readMessage = blockingQueue .take ();
25
25
} catch (InterruptedException e ) {
26
- logger .severe ("Error occurred while reading message from datum stream" + e .getMessage ());
26
+ logger .severe (
27
+ "Error occurred while reading message from datum stream" + e .getMessage ());
27
28
Thread .interrupted ();
28
29
}
29
30
return readMessage ;
Original file line number Diff line number Diff line change 1
1
package io .numaproj .numaflow .sink ;
2
2
3
- import io .numaproj .numaflow .function .v1 .Udfunction ;
4
3
import io .numaproj .numaflow .sink .v1 .Udsink ;
5
4
6
5
/**
7
6
* SinkDatumStream is an interface which will be passed to
8
7
* sink handlers to read input messages.
9
8
*/
10
9
public interface SinkDatumStream {
10
+ // EOF indicates the end of input
11
+ Udsink .Datum EOF = Udsink .Datum .newBuilder ().setKey ("EOF" ).build ();
12
+
11
13
/* ReadMessage can be used to read message from the stream
12
14
* returns null if there are no more messages to consume.*/
13
15
Udsink .Datum ReadMessage ();
14
- // EOF indicates the end of input
15
- Udsink .Datum EOF = Udsink .Datum .newBuilder ().setKey ("EOF" ).build ();
16
16
}
Original file line number Diff line number Diff line change @@ -23,7 +23,8 @@ public Udsink.Datum ReadMessage() {
23
23
try {
24
24
readMessage = blockingQueue .take ();
25
25
} catch (InterruptedException e ) {
26
- logger .severe ("Error occurred while reading message from datum stream" + e .getMessage ());
26
+ logger .severe (
27
+ "Error occurred while reading message from datum stream" + e .getMessage ());
27
28
Thread .interrupted ();
28
29
}
29
30
return readMessage ;
You can’t perform that action at this time.
0 commit comments