Skip to content

Commit 792d481

Browse files
authored
chore: fix gRPC config (#42)
Signed-off-by: Yashash H L <[email protected]>
1 parent 93c34ed commit 792d481

File tree

10 files changed

+79
-59
lines changed

10 files changed

+79
-59
lines changed

examples/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
<dependency>
1717
<groupId>io.numaproj.numaflow</groupId>
1818
<artifactId>numaflow-java</artifactId>
19-
<version>0.4.4</version>
19+
<version>0.4.5</version>
2020
</dependency>
2121
</dependencies>
2222

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
<groupId>io.numaproj.numaflow</groupId>
88
<artifactId>numaflow-java</artifactId>
9-
<version>0.4.4</version>
9+
<version>0.4.5</version>
1010
<packaging>jar</packaging>
1111

1212
<name>numaflow-java</name>

src/main/java/io/numaproj/numaflow/common/GRPCServerConfig.java

Lines changed: 0 additions & 22 deletions
This file was deleted.
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package io.numaproj.numaflow.function;
2+
3+
import com.google.common.annotations.VisibleForTesting;
4+
import io.numaproj.numaflow.info.ServerInfoConstants;
5+
import lombok.Getter;
6+
7+
@Getter
8+
public class FunctionGRPCConfig {
9+
private final String socketPath;
10+
private final int maxMessageSize;
11+
private String infoFilePath;
12+
13+
public FunctionGRPCConfig(int maxMessageSize) {
14+
this.socketPath = FunctionConstants.DEFAULT_SOCKET_PATH;
15+
this.maxMessageSize = maxMessageSize;
16+
this.infoFilePath = ServerInfoConstants.DEFAULT_SERVER_INFO_FILE_PATH;
17+
}
18+
19+
@VisibleForTesting
20+
public void setInfoFilePath(String infoFilePath) {
21+
this.infoFilePath = infoFilePath;
22+
}
23+
}

src/main/java/io/numaproj/numaflow/function/FunctionServer.java

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import io.netty.channel.epoll.EpollEventLoopGroup;
1414
import io.netty.channel.epoll.EpollServerDomainSocketChannel;
1515
import io.netty.channel.unix.DomainSocketAddress;
16-
import io.numaproj.numaflow.common.GRPCServerConfig;
1716
import io.numaproj.numaflow.function.map.MapHandler;
1817
import io.numaproj.numaflow.function.mapt.MapTHandler;
1918
import io.numaproj.numaflow.function.reduce.ReduceHandler;
@@ -34,36 +33,36 @@
3433
@Slf4j
3534
public class FunctionServer {
3635

37-
private final GRPCServerConfig grpcServerConfig;
36+
private final FunctionGRPCConfig grpcConfig;
3837
private final ServerBuilder<?> serverBuilder;
3938
private final FunctionService functionService = new FunctionService();
4039
private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper());
4140
private Server server;
4241

4342
public FunctionServer() {
44-
this(new GRPCServerConfig());
43+
this(new FunctionGRPCConfig(FunctionConstants.DEFAULT_MESSAGE_SIZE));
4544
}
4645

4746
/**
4847
* GRPC server constructor
4948
*
50-
* @param grpcServerConfig to configure the socket path and max message size for grpc
49+
* @param FunctionGRPCConfig to configure max message size for grpc
5150
*/
52-
public FunctionServer(GRPCServerConfig grpcServerConfig) {
53-
this(grpcServerConfig, new EpollEventLoopGroup());
51+
public FunctionServer(FunctionGRPCConfig grpcConfig) {
52+
this(grpcConfig, new EpollEventLoopGroup());
5453
}
5554

56-
public FunctionServer(GRPCServerConfig grpcServerConfig, EpollEventLoopGroup group) {
55+
public FunctionServer(FunctionGRPCConfig grpcConfig, EpollEventLoopGroup group) {
5756
this(NettyServerBuilder
58-
.forAddress(new DomainSocketAddress(grpcServerConfig.getSocketPath()))
57+
.forAddress(new DomainSocketAddress(grpcConfig.getSocketPath()))
5958
.channelType(EpollServerDomainSocketChannel.class)
60-
.maxInboundMessageSize(grpcServerConfig.getMaxMessageSize())
59+
.maxInboundMessageSize(grpcConfig.getMaxMessageSize())
6160
.workerEventLoopGroup(group)
62-
.bossEventLoopGroup(group), grpcServerConfig);
61+
.bossEventLoopGroup(group), grpcConfig);
6362
}
6463

65-
public FunctionServer(ServerBuilder<?> serverBuilder, GRPCServerConfig grpcServerConfig) {
66-
this.grpcServerConfig = grpcServerConfig;
64+
public FunctionServer(ServerBuilder<?> serverBuilder, FunctionGRPCConfig grpcConfig) {
65+
this.grpcConfig = grpcConfig;
6766
this.serverBuilder = serverBuilder;
6867
}
6968

@@ -86,8 +85,8 @@ public FunctionServer registerReducerFactory(ReducerFactory<? extends ReduceHand
8685
* Start serving requests.
8786
*/
8887
public void start() throws Exception {
89-
String socketPath = grpcServerConfig.getSocketPath();
90-
String infoFilePath = grpcServerConfig.getInfoFilePath();
88+
String socketPath = grpcConfig.getSocketPath();
89+
String infoFilePath = grpcConfig.getInfoFilePath();
9190
// cleanup socket path if it exists (unit test builder doesn't use one)
9291
if (socketPath != null) {
9392
Path path = Paths.get(socketPath);
@@ -130,7 +129,7 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
130129
// start server
131130
server.start();
132131
log.info(
133-
"Server started, listening on socket path: " + grpcServerConfig.getSocketPath());
132+
"Server started, listening on socket path: " + grpcConfig.getSocketPath());
134133

135134
// register shutdown hook
136135
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package io.numaproj.numaflow.sink;
2+
3+
import com.google.common.annotations.VisibleForTesting;
4+
import io.numaproj.numaflow.info.ServerInfoConstants;
5+
import lombok.Getter;
6+
7+
@Getter
8+
public class SinkGRPCConfig {
9+
private final String socketPath;
10+
private final int maxMessageSize;
11+
private String infoFilePath;
12+
13+
public SinkGRPCConfig(int maxMessageSize) {
14+
this.socketPath = SinkConstants.DEFAULT_SOCKET_PATH;
15+
this.maxMessageSize = maxMessageSize;
16+
this.infoFilePath = ServerInfoConstants.DEFAULT_SERVER_INFO_FILE_PATH;
17+
}
18+
19+
@VisibleForTesting
20+
public void setInfoFilePath(String infoFilePath) {
21+
this.infoFilePath = infoFilePath;
22+
}
23+
}

src/main/java/io/numaproj/numaflow/sink/SinkHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package io.numaproj.numaflow.sink;
22

33
/**
4-
* SinkHandler exposes method for publishing messages to sink
4+
* SinkHandler exposes method for publishing messages to sink.
55
* Implementations should override the processMessage method
66
* which will be used for processing the input messages
77
*/

src/main/java/io/numaproj/numaflow/sink/SinkServer.java

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import io.netty.channel.epoll.EpollEventLoopGroup;
88
import io.netty.channel.epoll.EpollServerDomainSocketChannel;
99
import io.netty.channel.unix.DomainSocketAddress;
10-
import io.numaproj.numaflow.common.GRPCServerConfig;
1110
import io.numaproj.numaflow.info.Language;
1211
import io.numaproj.numaflow.info.Protocol;
1312
import io.numaproj.numaflow.info.ServerInfo;
@@ -24,36 +23,36 @@
2423
@Slf4j
2524
public class SinkServer {
2625

27-
private final GRPCServerConfig grpcServerConfig;
26+
private final SinkGRPCConfig grpcConfig;
2827
private final ServerBuilder<?> serverBuilder;
2928
private final SinkService sinkService = new SinkService();
3029
private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper());
3130
private Server server;
3231

3332
public SinkServer() {
34-
this(new GRPCServerConfig());
33+
this(new SinkGRPCConfig(SinkConstants.DEFAULT_MESSAGE_SIZE));
3534
}
3635

3736
/**
3837
* GRPC server constructor
3938
*
40-
* @param grpcServerConfig to configure the socket path and max message size for grpc
39+
* @param SinkGRPCConfig to configure the max message size for grpc
4140
*/
42-
public SinkServer(GRPCServerConfig grpcServerConfig) {
43-
this(grpcServerConfig, new EpollEventLoopGroup());
41+
public SinkServer(SinkGRPCConfig grpcConfig) {
42+
this(grpcConfig, new EpollEventLoopGroup());
4443
}
4544

46-
public SinkServer(GRPCServerConfig grpcServerConfig, EpollEventLoopGroup group) {
45+
public SinkServer(SinkGRPCConfig grpcConfig, EpollEventLoopGroup group) {
4746
this(NettyServerBuilder
48-
.forAddress(new DomainSocketAddress(grpcServerConfig.getSocketPath()))
47+
.forAddress(new DomainSocketAddress(grpcConfig.getSocketPath()))
4948
.channelType(EpollServerDomainSocketChannel.class)
50-
.maxInboundMessageSize(grpcServerConfig.getMaxMessageSize())
49+
.maxInboundMessageSize(grpcConfig.getMaxMessageSize())
5150
.workerEventLoopGroup(group)
52-
.bossEventLoopGroup(group), grpcServerConfig);
51+
.bossEventLoopGroup(group), grpcConfig);
5352
}
5453

55-
public SinkServer(ServerBuilder<?> serverBuilder, GRPCServerConfig grpcServerConfig) {
56-
this.grpcServerConfig = grpcServerConfig;
54+
public SinkServer(ServerBuilder<?> serverBuilder, SinkGRPCConfig grpcConfig) {
55+
this.grpcConfig = grpcConfig;
5756
this.serverBuilder = serverBuilder;
5857
}
5958

@@ -66,8 +65,8 @@ public SinkServer registerSinker(SinkHandler sinkHandler) {
6665
* Start serving requests.
6766
*/
6867
public void start() throws Exception {
69-
String socketPath = grpcServerConfig.getSocketPath();
70-
String infoFilePath = grpcServerConfig.getInfoFilePath();
68+
String socketPath = grpcConfig.getSocketPath();
69+
String infoFilePath = grpcConfig.getInfoFilePath();
7170
// cleanup socket path if it exists (unit test builder doesn't use one)
7271
if (socketPath != null) {
7372
Path path = Paths.get(socketPath);
@@ -93,7 +92,7 @@ public void start() throws Exception {
9392
// start server
9493
server.start();
9594
log.info(
96-
"Server started, listening on socket path: " + grpcServerConfig.getSocketPath());
95+
"Server started, listening on socket path: " + grpcConfig.getSocketPath());
9796

9897
// register shutdown hook
9998
Runtime.getRuntime().addShutdownHook(new Thread(() -> {

src/test/java/io/numaproj/numaflow/function/FunctionServerTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import io.grpc.stub.MetadataUtils;
99
import io.grpc.stub.StreamObserver;
1010
import io.grpc.testing.GrpcCleanupRule;
11-
import io.numaproj.numaflow.common.GRPCServerConfig;
1211
import io.numaproj.numaflow.function.map.MapHandler;
1312
import io.numaproj.numaflow.function.mapt.MapTHandler;
1413
import io.numaproj.numaflow.function.v1.Udfunction;
@@ -45,7 +44,7 @@ public class FunctionServerTest {
4544
public void setUp() throws Exception {
4645
String serverName = InProcessServerBuilder.generateName();
4746

48-
GRPCServerConfig grpcServerConfig = new GRPCServerConfig();
47+
FunctionGRPCConfig grpcServerConfig = new FunctionGRPCConfig(FunctionConstants.DEFAULT_MESSAGE_SIZE);
4948
grpcServerConfig.setInfoFilePath("/tmp/numaflow-test-server-info");
5049
server = new FunctionServer(
5150
InProcessServerBuilder.forName(serverName).directExecutor(),

src/test/java/io/numaproj/numaflow/sink/SinkServerTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import io.grpc.inprocess.InProcessServerBuilder;
77
import io.grpc.stub.StreamObserver;
88
import io.grpc.testing.GrpcCleanupRule;
9-
import io.numaproj.numaflow.common.GRPCServerConfig;
109
import io.numaproj.numaflow.sink.v1.Udsink;
1110
import io.numaproj.numaflow.sink.v1.UserDefinedSinkGrpc;
1211
import org.junit.After;
@@ -34,7 +33,7 @@ public class SinkServerTest {
3433
@Before
3534
public void setUp() throws Exception {
3635
String serverName = InProcessServerBuilder.generateName();
37-
GRPCServerConfig grpcServerConfig = new GRPCServerConfig();
36+
SinkGRPCConfig grpcServerConfig = new SinkGRPCConfig(SinkConstants.DEFAULT_MESSAGE_SIZE);
3837
grpcServerConfig.setInfoFilePath("/tmp/numaflow-test-server-info");
3938
server = new SinkServer(
4039
InProcessServerBuilder.forName(serverName).directExecutor(),

0 commit comments

Comments
 (0)