Skip to content

Commit bf88956

Browse files
authored
feat: bidirectional streaming (#23)
Signed-off-by: Yashash H L <[email protected]>
1 parent f6eeb2e commit bf88956

File tree

9 files changed

+108
-143
lines changed

9 files changed

+108
-143
lines changed

src/main/java/io/numaproj/numaflow/function/FunctionService.java

Lines changed: 6 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -164,16 +164,16 @@ public StreamObserver<Udfunction.Datum> reduceFn(final StreamObserver<Udfunction
164164
create a supervisor actor which assign the tasks to child actors.
165165
we create a child actor for every key in a window.
166166
*/
167-
ActorRef parentActorRef = actorSystem
168-
.actorOf(ReduceSupervisorActor.props(reducerFactory, md, shutdownActorRef));
167+
ActorRef supervisorActor = actorSystem
168+
.actorOf(ReduceSupervisorActor.props(reducerFactory, md, shutdownActorRef, responseObserver));
169169

170170

171171
return new StreamObserver<Udfunction.Datum>() {
172172
@Override
173173
public void onNext(Udfunction.Datum datum) {
174174
// send the message to parent actor, which takes care of distribution.
175-
if (!parentActorRef.isTerminated()) {
176-
parentActorRef.tell(datum, parentActorRef);
175+
if (!supervisorActor.isTerminated()) {
176+
supervisorActor.tell(datum, supervisorActor);
177177
} else {
178178
responseObserver.onError(new Throwable("Supervisor actor was terminated"));
179179
}
@@ -187,21 +187,8 @@ public void onError(Throwable throwable) {
187187

188188
@Override
189189
public void onCompleted() {
190-
191-
// Ask the parent to return the list of futures returned by child actors.
192-
Future<Object> resultFuture = Patterns.ask(parentActorRef, EOF, Integer.MAX_VALUE);
193-
194-
List<Future<Object>> udfResultFutures;
195-
try {
196-
udfResultFutures = (List<Future<Object>>) Await.result(
197-
resultFuture,
198-
Duration.Inf());
199-
} catch (TimeoutException | InterruptedException e) {
200-
responseObserver.onError(e);
201-
return;
202-
}
203-
204-
extractResult(udfResultFutures, responseObserver, parentActorRef);
190+
// indicate the end of input to the supervisor
191+
supervisorActor.tell(EOF, ActorRef.noSender());
205192

206193
}
207194
};
@@ -243,52 +230,6 @@ private Udfunction.DatumList buildDatumListResponse(MessageT[] messageTs) {
243230
return datumListBuilder.build();
244231
}
245232

246-
/*
247-
extracts and returns the result to the observer.
248-
*/
249-
private void extractResult(
250-
List<Future<Object>> futureList,
251-
StreamObserver<Udfunction.DatumList> responseObserver,
252-
ActorRef supervisorActorRef) {
253-
Udfunction.DatumList.Builder responseBuilder = Udfunction.DatumList.newBuilder();
254-
255-
// build the response when its completed.
256-
Futures
257-
.sequence(futureList, actorSystem.dispatcher())
258-
.onComplete(new OnComplete<>() {
259-
@Override
260-
public void onComplete(
261-
Throwable failure,
262-
Iterable<Object> success) {
263-
264-
// if there are any failures indicate it to the observer.
265-
if (failure != null) {
266-
log.error("Error while getting output from actors - {}"
267-
, failure.getMessage());
268-
269-
responseObserver.onError(failure);
270-
return;
271-
}
272-
273-
success.forEach(ele -> {
274-
Udfunction.DatumList list = buildDatumListResponse((Message[]) ele);
275-
responseBuilder.addAllElements(list.getElementsList());
276-
});
277-
Udfunction.DatumList response = responseBuilder.build();
278-
279-
responseObserver.onNext(response);
280-
responseObserver.onCompleted();
281-
282-
/*
283-
once the result is returned we can stop the supervisor actor.
284-
stopping the supervisor will stop all its child actors.
285-
we should explicitly stop the actors for it to be garbage collected.
286-
*/
287-
actorSystem.stop(supervisorActorRef);
288-
}
289-
}, actorSystem.dispatcher());
290-
}
291-
292233
// log the exception and exit if there are any uncaught exceptions.
293234
private void handleFailure(CompletableFuture<Void> failureFuture) {
294235
new Thread(() -> {
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package io.numaproj.numaflow.function.reduce;
2+
3+
import io.numaproj.numaflow.function.v1.Udfunction;
4+
import lombok.AllArgsConstructor;
5+
import lombok.Getter;
6+
/*
7+
used to store the reduced result from the handler
8+
*/
9+
@Getter
10+
@AllArgsConstructor
11+
public class ActorResponse {
12+
String key;
13+
Udfunction.DatumList datumList;
14+
}

src/main/java/io/numaproj/numaflow/function/reduce/ReduceActor.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,22 @@
33
import akka.actor.AbstractActor;
44
import akka.actor.Props;
55
import akka.japi.pf.ReceiveBuilder;
6+
import com.google.protobuf.ByteString;
67
import io.numaproj.numaflow.function.HandlerDatum;
8+
import io.numaproj.numaflow.function.Message;
79
import io.numaproj.numaflow.function.metadata.Metadata;
10+
import io.numaproj.numaflow.function.v1.Udfunction;
811
import lombok.AllArgsConstructor;
9-
import lombok.NoArgsConstructor;
1012
import lombok.extern.slf4j.Slf4j;
1113

14+
import java.util.Arrays;
15+
1216
/**
1317
* Reduce actor invokes the user defined code and returns the result.
1418
*/
1519

1620
@Slf4j
1721
@AllArgsConstructor
18-
@NoArgsConstructor
1922
public class ReduceActor extends AbstractActor {
2023

2124
private String key;
@@ -40,7 +43,20 @@ private void invokeHandler(HandlerDatum handlerDatum) {
4043
}
4144

4245
private void getResult(String eof) {
43-
getSender().tell(this.groupBy.getOutput(key, md), getSelf());
46+
Message[] resultMessages = this.groupBy.getOutput(key, md);
47+
// send the result back to sender(parent actor)
48+
getSender().tell(buildDatumListResponse(resultMessages), getSelf());
49+
}
50+
51+
private ActorResponse buildDatumListResponse(Message[] messages) {
52+
Udfunction.DatumList.Builder datumListBuilder = Udfunction.DatumList.newBuilder();
53+
Arrays.stream(messages).forEach(message -> {
54+
datumListBuilder.addElements(Udfunction.Datum.newBuilder()
55+
.setKey(message.getKey())
56+
.setValue(ByteString.copyFrom(message.getValue()))
57+
.build());
58+
});
59+
return new ActorResponse(this.key, datumListBuilder.build());
4460
}
4561

4662
}

src/main/java/io/numaproj/numaflow/function/reduce/ReduceSupervisorActor.java

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77
import akka.actor.SupervisorStrategy;
88
import akka.japi.pf.DeciderBuilder;
99
import akka.japi.pf.ReceiveBuilder;
10-
import akka.pattern.Patterns;
1110
import com.google.common.base.Preconditions;
11+
import io.grpc.stub.StreamObserver;
1212
import io.numaproj.numaflow.function.Function;
1313
import io.numaproj.numaflow.function.FunctionService;
1414
import io.numaproj.numaflow.function.HandlerDatum;
@@ -19,7 +19,6 @@
1919
import scala.collection.Iterable;
2020
import scala.concurrent.Future;
2121

22-
import java.lang.reflect.InvocationTargetException;
2322
import java.time.Instant;
2423
import java.util.ArrayList;
2524
import java.util.HashMap;
@@ -36,23 +35,27 @@ public class ReduceSupervisorActor extends AbstractActor {
3635
private final ReducerFactory<? extends Reducer> reducerFactory;
3736
private final Metadata md;
3837
private final ActorRef shutdownActor;
38+
private final StreamObserver<Udfunction.DatumList> responseObserver;
3939
private final Map<String, ActorRef> actorsMap = new HashMap<>();
4040
private final List<Future<Object>> results = new ArrayList<>();
4141

4242
public ReduceSupervisorActor(
4343
ReducerFactory<? extends Reducer> reducerFactory,
4444
Metadata md,
45-
ActorRef shutdownActor) {
45+
ActorRef shutdownActor,
46+
StreamObserver<Udfunction.DatumList> responseObserver) {
4647
this.reducerFactory = reducerFactory;
4748
this.md = md;
4849
this.shutdownActor = shutdownActor;
50+
this.responseObserver = responseObserver;
4951
}
5052

5153
public static Props props(
5254
ReducerFactory<? extends Reducer> reducerFactory,
5355
Metadata md,
54-
ActorRef shutdownActor) {
55-
return Props.create(ReduceSupervisorActor.class, reducerFactory, md, shutdownActor);
56+
ActorRef shutdownActor,
57+
StreamObserver<Udfunction.DatumList> responseObserver) {
58+
return Props.create(ReduceSupervisorActor.class, reducerFactory, md, shutdownActor, responseObserver);
5659
}
5760

5861
// if there is an uncaught exception stop in the supervisor actor, send a signal to shut down
@@ -65,7 +68,7 @@ public void preRestart(Throwable reason, Optional<Object> message) {
6568

6669
@Override
6770
public SupervisorStrategy supervisorStrategy() {
68-
return new ReduceSupervisorStratergy();
71+
return new ReduceSupervisorStrategy();
6972
}
7073

7174

@@ -81,15 +84,20 @@ public Receive createReceive() {
8184
.create()
8285
.match(Udfunction.Datum.class, this::invokeActors)
8386
.match(String.class, this::sendEOF)
87+
.match(ActorResponse.class, this::responseListener)
8488
.build();
8589
}
8690

91+
/*
92+
based on key of the input message invoke the right actor
93+
if there is no actor for an incoming key, create a new actor
94+
track all the child actors using actors map
95+
*/
8796
private void invokeActors(Udfunction.Datum datum) {
8897
if (!actorsMap.containsKey(datum.getKey())) {
8998

9099
Reducer reducer = reducerFactory.createReducer();
91100

92-
93101
ActorRef actorRef = getContext()
94102
.actorOf(ReduceActor.props(datum.getKey(), md, reducer));
95103

@@ -101,10 +109,25 @@ private void invokeActors(Udfunction.Datum datum) {
101109

102110
private void sendEOF(String EOF) {
103111
for (Map.Entry<String, ActorRef> entry : actorsMap.entrySet()) {
104-
results.add(Patterns.ask(entry.getValue(), EOF, Integer.MAX_VALUE));
112+
entry.getValue().tell(EOF, getSelf());
113+
}
114+
}
115+
116+
// listen to child actors for the result.
117+
private void responseListener(ActorResponse actorResponse) {
118+
/*
119+
send the result back to the client
120+
remove the child entry from the map after getting result.
121+
if there are no entries in the map, that means processing is
122+
done we can close the stream.
123+
*/
124+
125+
responseObserver.onNext(actorResponse.getDatumList());
126+
actorsMap.remove(actorResponse.getKey());
127+
if (actorsMap.isEmpty()) {
128+
responseObserver.onCompleted();
129+
getContext().getSystem().stop(getSelf());
105130
}
106-
actorsMap.clear();
107-
getSender().tell(results, getSelf());
108131
}
109132

110133
private HandlerDatum constructHandlerDatum(Udfunction.Datum datum) {
@@ -123,7 +146,7 @@ private HandlerDatum constructHandlerDatum(Udfunction.Datum datum) {
123146
actors will be restarted, but we want to escalate the exception and terminate
124147
the system.
125148
*/
126-
private final class ReduceSupervisorStratergy extends SupervisorStrategy {
149+
private final class ReduceSupervisorStrategy extends SupervisorStrategy {
127150

128151
@Override
129152
public PartialFunction<Throwable, Directive> decider() {

src/main/proto/function/v1/udfunction.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ service UserDefinedFunction {
1717
rpc MapTFn(Datum) returns (DatumList);
1818

1919
// ReduceFn applies a reduce function to a datum stream.
20-
rpc ReduceFn(stream Datum) returns (DatumList);
20+
rpc ReduceFn(stream Datum) returns (stream DatumList);
2121

2222
// IsReady is the heartbeat endpoint for gRPC.
2323
rpc IsReady(google.protobuf.Empty) returns (ReadyResponse);

src/test/java/io/numaproj/numaflow/function/FunctionServerTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ public void reducerWithOneKey() {
165165
String expectedKey = reduceKey + REDUCE_PROCESSED_KEY_SUFFIX;
166166
// sum of first 10 numbers 1 to 10 -> 55
167167
ByteString expectedValue = ByteString.copyFromUtf8(String.valueOf(55));
168-
while (outputStreamObserver.resultDatum.get() == null) ;
168+
while (!outputStreamObserver.completed.get());
169169

170170
assertEquals(1, outputStreamObserver.resultDatum.get().getElementsCount());
171171
assertEquals(expectedKey, outputStreamObserver.resultDatum.get().getElements(0).getKey());
@@ -211,8 +211,7 @@ public void reducerWithMultipleKey() {
211211
// sum of first 10 numbers 1 to 10 -> 55
212212
ByteString expectedValue = ByteString.copyFromUtf8(String.valueOf(55));
213213

214-
while (outputStreamObserver.resultDatum.get() == null) ;
215-
214+
while (!outputStreamObserver.completed.get());
216215
Udfunction.DatumList result = outputStreamObserver.resultDatum.get();
217216
assertEquals(100, result.getElementsCount());
218217
for (int i = 0; i < keyCount; i++) {

src/test/java/io/numaproj/numaflow/function/ReduceOutputStreamObserver.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,13 @@
88

99
@Slf4j
1010
public class ReduceOutputStreamObserver implements StreamObserver<Udfunction.DatumList> {
11-
public AtomicReference<Udfunction.DatumList> resultDatum = new AtomicReference<>();
11+
public AtomicReference<Boolean> completed = new AtomicReference<>(false);
12+
public AtomicReference<Udfunction.DatumList> resultDatum = new AtomicReference<>(Udfunction.DatumList.newBuilder().build());
1213
public Throwable t;
1314

1415
@Override
1516
public void onNext(Udfunction.DatumList datum) {
16-
resultDatum.set(datum);
17+
resultDatum.set(resultDatum.get().toBuilder().addAllElements(datum.getElementsList()).build());
1718
}
1819

1920
@Override
@@ -24,5 +25,6 @@ public void onError(Throwable throwable) {
2425
@Override
2526
public void onCompleted() {
2627
log.info("on completed executed");
28+
this.completed.set(true);
2729
}
2830
}

0 commit comments

Comments
 (0)