Skip to content

Commit d3068ac

Browse files
authored
chore: update sink interface (#69)
Signed-off-by: Yashash H L <[email protected]>
1 parent b9970b8 commit d3068ac

File tree

13 files changed

+80
-45
lines changed

13 files changed

+80
-45
lines changed

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,7 @@ Add this dependency to your project's POM:
2525
<dependency>
2626
<groupId>io.numaproj.numaflow</groupId>
2727
<artifactId>numaflow-java</artifactId>
28-
<version>${latest}</version>
29-
<scope>compile</scope>
28+
<version>0.5.1</version>
3029
</dependency>
3130
```
3231

@@ -35,7 +34,8 @@ Add this dependency to your project's POM:
3534
Add this dependency to your project's build file:
3635

3736
```groovy
38-
compile "io.numaproj.numaflow:numaflow-java:${latest}"
37+
compile "io.numaproj.numaflow:numaflow-java:0.5.1"
38+
```
3939
```
4040
4141
### Build

examples/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
<dependency>
1717
<groupId>io.numaproj.numaflow</groupId>
1818
<artifactId>numaflow-java</artifactId>
19-
<version>0.5.0</version>
19+
<version>0.5.1</version>
2020
</dependency>
2121

2222
<dependency>
Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,45 @@
11
package io.numaproj.numaflow.examples.sink.simple;
22

3+
import com.fasterxml.jackson.databind.ObjectMapper;
34
import io.numaproj.numaflow.sinker.Datum;
45
import io.numaproj.numaflow.sinker.Response;
6+
import io.numaproj.numaflow.sinker.ResponseList;
57
import io.numaproj.numaflow.sinker.Server;
68
import io.numaproj.numaflow.sinker.Sinker;
79
import lombok.extern.slf4j.Slf4j;
810

11+
912
/**
1013
* This is a simple User Defined Sink example which logs the input message
1114
*/
1215

1316
@Slf4j
1417
public class SimpleSink extends Sinker {
15-
18+
private final ObjectMapper mapper = new ObjectMapper();
19+
private final ResponseList.ResponseListBuilder responseListBuilder = ResponseList.newBuilder();
1620
public static void main(String[] args) throws Exception {
1721
new Server(new SimpleSink()).start();
1822
}
1923

2024
@Override
21-
public Response processMessage(Datum datum) {
22-
log.info(new String(datum.getValue()));
23-
return Response.responseOK(datum.getId());
25+
public void processMessage(Datum datum) {
26+
try {
27+
String msg = new String(datum.getValue());
28+
log.info("Received message: {}", msg);
29+
responseListBuilder.addResponse(Response.responseOK(datum.getId()));
30+
} catch (Exception e) {
31+
responseListBuilder.addResponse(Response.responseFailure(datum.getId(), e.getMessage()));
32+
}
33+
}
34+
35+
@Override
36+
public ResponseList getResponse() {
37+
// Reset the builder after building the response to avoid keeping old responses in memory
38+
// this is required as the same sinker instance is used for multiple requests
39+
try {
40+
return responseListBuilder.build();
41+
} finally {
42+
responseListBuilder.clearResponses();
43+
}
2444
}
2545
}

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
<groupId>io.numaproj.numaflow</groupId>
88
<artifactId>numaflow-java</artifactId>
9-
<version>0.5.0</version>
9+
<version>0.5.1</version>
1010
<packaging>jar</packaging>
1111

1212
<name>numaflow-java</name>

src/main/java/io/numaproj/numaflow/reducer/Constants.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,10 @@
11
package io.numaproj.numaflow.reducer;
22

3-
import io.grpc.Context;
4-
53
class Constants {
64
public static final int DEFAULT_MESSAGE_SIZE = 1024 * 1024 * 64;
75

86
public static final String DEFAULT_SOCKET_PATH = "/var/run/numaflow/reduce.sock";
97

10-
public static final String WIN_START_KEY = "x-numaflow-win-start-time";
11-
12-
public static final String WIN_END_KEY = "x-numaflow-win-end-time";
13-
14-
public static final Context.Key<String> WINDOW_START_TIME = Context.keyWithDefault(
15-
WIN_START_KEY,
16-
"");
17-
18-
public static final Context.Key<String> WINDOW_END_TIME = Context.keyWithDefault(
19-
WIN_END_KEY,
20-
"");
21-
228
public static final String EOF = "EOF";
239

2410
public static final String SUCCESS = "SUCCESS";

src/main/java/io/numaproj/numaflow/reducer/Service.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
1111
import io.numaproj.numaflow.reducer.metadata.IntervalWindowImpl;
1212
import io.numaproj.numaflow.reducer.metadata.MetadataImpl;
13+
import io.numaproj.numaflow.shared.GrpcServerUtils;
1314
import lombok.extern.slf4j.Slf4j;
1415

1516
import java.time.Instant;
@@ -41,8 +42,8 @@ public StreamObserver<ReduceOuterClass.ReduceRequest> reduceFn(final StreamObser
4142
}
4243

4344
// get window start and end time from gPRC metadata
44-
String winSt = io.numaproj.numaflow.reducer.Constants.WINDOW_START_TIME.get();
45-
String winEt = io.numaproj.numaflow.reducer.Constants.WINDOW_END_TIME.get();
45+
String winSt = GrpcServerUtils.WINDOW_START_TIME.get();
46+
String winEt = GrpcServerUtils.WINDOW_END_TIME.get();
4647

4748
// convert the start and end time to Instant
4849
Instant startTime = Instant.ofEpochMilli(Long.parseLong(winSt));

src/main/java/io/numaproj/numaflow/shared/GrpcServerUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public class GrpcServerUtils {
4747
WIN_END_KEY,
4848
"");
4949

50-
public static final Metadata.Key<String> DATUM_METADATA_WIN_START = io.grpc.Metadata.Key.of(
50+
public static final Metadata.Key<String> DATUM_METADATA_WIN_START = Metadata.Key.of(
5151
WIN_START_KEY,
5252
Metadata.ASCII_STRING_MARSHALLER);
5353

@@ -97,7 +97,7 @@ public static void writeServerInfo(ServerInfoAccessor serverInfoAccessor, String
9797
}
9898

9999
public static ServerBuilder<?> createServerBuilder(String socketPath, int maxMessageSize) {
100-
ServerInterceptor interceptor = new ServerInterceptor() {
100+
ServerInterceptor interceptor = new ServerInterceptor() {
101101
@Override
102102
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
103103
ServerCall<ReqT, RespT> call,

src/main/java/io/numaproj/numaflow/sinker/SinkActor.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
class SinkActor extends AbstractActor {
1515

1616
private final Sinker sinker;
17-
private final ResponseList.ResponseListBuilder responseListBuilder = ResponseList.newBuilder();
1817

1918
public static Props props(Sinker sinker) {
2019
return Props.create(SinkActor.class, sinker);
@@ -33,12 +32,15 @@ public Receive createReceive() {
3332
.build();
3433
}
3534

35+
// invokeHandler is called by the sink server when a new message is received.
3636
private void invokeHandler(HandlerDatum handlerDatum) {
37-
responseListBuilder.addResponse(this.sinker.processMessage(handlerDatum));
37+
this.sinker.processMessage(handlerDatum);
3838
}
3939

40+
// getResult is called by the sink server when EOF is received.
4041
private void getResult(String eof) {
41-
getContext().getParent().tell(responseListBuilder.build(), ActorRef.noSender());
42+
ResponseList responseList = this.sinker.getResponse();
43+
getContext().getParent().tell(responseList, ActorRef.noSender());
4244
}
4345
}
4446

src/main/java/io/numaproj/numaflow/sinker/Sinker.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,20 @@
99
public abstract class Sinker {
1010
/**
1111
* method will be used for processing messages.
12+
* response for the message should be added to the
13+
* response list which will be returned by getResponse
1214
* @param datum current message to be processed
13-
* @return Response to indicate success or failure.
1415
*/
15-
public abstract Response processMessage(Datum datum);
16+
public abstract void processMessage(Datum datum);
17+
18+
/**
19+
* method will be used for returning the responses.
20+
* each message should have a response, if there are
21+
* n messages then there should be n responses and
22+
* each response should contain the id of the message
23+
* Response.responseOK() and Response.responseFailure() can
24+
* be used for creating the responses
25+
* @return ResponseList which contains the responses
26+
*/
27+
public abstract ResponseList getResponse();
1628
}

src/test/java/io/numaproj/numaflow/reducer/ServerErrTest.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,14 @@
1515
import io.grpc.testing.GrpcCleanupRule;
1616
import io.numaproj.numaflow.reduce.v1.ReduceGrpc;
1717
import io.numaproj.numaflow.reduce.v1.ReduceOuterClass;
18+
import io.numaproj.numaflow.shared.GrpcServerUtils;
1819
import org.junit.After;
1920
import org.junit.Before;
2021
import org.junit.Rule;
2122
import org.junit.Test;
2223

23-
import static io.numaproj.numaflow.reducer.Constants.WIN_END_KEY;
24-
import static io.numaproj.numaflow.reducer.Constants.WIN_START_KEY;
24+
import static io.numaproj.numaflow.shared.GrpcServerUtils.WIN_END_KEY;
25+
import static io.numaproj.numaflow.shared.GrpcServerUtils.WIN_START_KEY;
2526
import static org.junit.Assert.assertEquals;
2627
import static org.junit.Assert.fail;
2728

@@ -53,9 +54,9 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
5354
ServerCallHandler<ReqT, RespT> next) {
5455
final var context =
5556
Context.current().withValues(
56-
Constants.WINDOW_START_TIME,
57+
GrpcServerUtils.WINDOW_START_TIME,
5758
headers.get(DATUM_METADATA_WIN_START),
58-
Constants.WINDOW_END_TIME,
59+
GrpcServerUtils.WINDOW_END_TIME,
5960
headers.get(DATUM_METADATA_WIN_END));
6061
return Contexts.interceptCall(context, call, headers, next);
6162
}

0 commit comments

Comments
 (0)