Skip to content

Commit 257c153

Browse files
authored
feat: introducing factory for reducer (#22)
Signed-off-by: Yashash H L <[email protected]>
1 parent 62b1f0b commit 257c153

25 files changed

+235
-114
lines changed

.DS_Store

-6 KB
Binary file not shown.

examples/src/main/java/io/numaproj/numaflow/examples/function/evenodd/EvenOddFunction.java renamed to examples/src/main/java/io/numaproj/numaflow/examples/function/map/evenodd/EvenOddFunction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package io.numaproj.numaflow.examples.function.evenodd;
1+
package io.numaproj.numaflow.examples.function.map.evenodd;
22

33
import io.numaproj.numaflow.function.Datum;
44
import io.numaproj.numaflow.function.FunctionServer;
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package io.numaproj.numaflow.examples.function.eventtimefilter;
1+
package io.numaproj.numaflow.examples.function.map.eventtimefilter;
22

33
import io.numaproj.numaflow.function.Datum;
44
import io.numaproj.numaflow.function.FunctionServer;

examples/src/main/java/io/numaproj/numaflow/examples/function/flatmap/FlatMapFunction.java renamed to examples/src/main/java/io/numaproj/numaflow/examples/function/map/flatmap/FlatMapFunction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package io.numaproj.numaflow.examples.function.flatmap;
1+
package io.numaproj.numaflow.examples.function.map.flatmap;
22

33
import io.numaproj.numaflow.function.Datum;
44
import io.numaproj.numaflow.function.FunctionServer;

examples/src/main/java/io/numaproj/numaflow/examples/function/forward/ForwardFunction.java renamed to examples/src/main/java/io/numaproj/numaflow/examples/function/map/forward/ForwardFunction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package io.numaproj.numaflow.examples.function.forward;
1+
package io.numaproj.numaflow.examples.function.map.forward;
22

33
import io.numaproj.numaflow.function.Datum;
44
import io.numaproj.numaflow.function.FunctionServer;
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package io.numaproj.numaflow.examples.function.reduce.count;
2+
3+
import lombok.AllArgsConstructor;
4+
import lombok.Getter;
5+
6+
@Getter
7+
@AllArgsConstructor
8+
public class Config {
9+
private int oddIncrementBy;
10+
private int evenIncrementBy;
11+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package io.numaproj.numaflow.examples.function.reduce.count;
2+
3+
import io.numaproj.numaflow.function.Datum;
4+
import io.numaproj.numaflow.function.FunctionServer;
5+
import io.numaproj.numaflow.function.Message;
6+
import io.numaproj.numaflow.function.metadata.Metadata;
7+
import io.numaproj.numaflow.function.reduce.Reducer;
8+
import io.numaproj.numaflow.function.reduce.ReducerFactory;
9+
import lombok.AllArgsConstructor;
10+
import lombok.extern.slf4j.Slf4j;
11+
12+
import java.io.IOException;
13+
import java.util.Objects;
14+
15+
@Slf4j
16+
@AllArgsConstructor
17+
public class EvenOddCounterFactory extends ReducerFactory<EvenOddCounterFactory.EvenOddCounter> {
18+
private Config config;
19+
20+
@Override
21+
public EvenOddCounter createReducer() {
22+
return new EvenOddCounter(config);
23+
}
24+
25+
@Slf4j
26+
public static class EvenOddCounter extends Reducer {
27+
private final Config config;
28+
private int evenCount;
29+
private int oddCount;
30+
31+
public EvenOddCounter(Config config) {
32+
this.config = config;
33+
}
34+
35+
@Override
36+
public void addMessage(String key, Datum datum, Metadata md) {
37+
try {
38+
int val = Integer.parseInt(new String(datum.getValue()));
39+
// increment based on the value specified in the config
40+
if (val % 2 == 0) {
41+
evenCount += config.getEvenIncrementBy();
42+
} else {
43+
oddCount += config.getOddIncrementBy();
44+
}
45+
} catch (NumberFormatException e) {
46+
log.info("error while parsing integer - {}", e.getMessage());
47+
}
48+
}
49+
50+
@Override
51+
public Message[] getOutput(String key, Metadata md) {
52+
log.info(
53+
"even and odd count - {} {}, window - {} {}",
54+
evenCount,
55+
oddCount,
56+
md.getIntervalWindow().getStartTime().toString(),
57+
md.getIntervalWindow().getEndTime().toString());
58+
59+
if (Objects.equals(key, "even")) {
60+
return new Message[]{Message.to(key, String.valueOf(evenCount).getBytes())};
61+
} else {
62+
return new Message[]{Message.to(key, String.valueOf(oddCount).getBytes())};
63+
}
64+
}
65+
}
66+
67+
public static void main(String[] args) throws IOException {
68+
log.info("counter udf was invoked");
69+
Config config = new Config(1, 2);
70+
new FunctionServer().registerReducerFactory(new EvenOddCounterFactory(config)).start();
71+
}
72+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package io.numaproj.numaflow.examples.function.reduce.sum;
2+
3+
import io.numaproj.numaflow.function.FunctionServer;
4+
import io.numaproj.numaflow.function.reduce.ReducerFactory;
5+
import lombok.extern.slf4j.Slf4j;
6+
7+
import java.io.IOException;
8+
9+
@Slf4j
10+
public class SumFactory extends ReducerFactory<SumFunction> {
11+
12+
public static void main(String[] args) throws IOException {
13+
log.info("sum udf was invoked");
14+
new FunctionServer().registerReducerFactory(new SumFactory()).start();
15+
}
16+
17+
@Override
18+
public SumFunction createReducer() {
19+
return new SumFunction();
20+
}
21+
}
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,19 @@
1-
package io.numaproj.numaflow.examples.function.sum;
1+
package io.numaproj.numaflow.examples.function.reduce.sum;
22

33
import io.numaproj.numaflow.function.Datum;
4-
import io.numaproj.numaflow.function.FunctionServer;
54
import io.numaproj.numaflow.function.Message;
65
import io.numaproj.numaflow.function.metadata.Metadata;
76
import io.numaproj.numaflow.function.reduce.Reducer;
87
import lombok.extern.slf4j.Slf4j;
98

10-
import java.io.IOException;
119

1210
@Slf4j
1311
public class SumFunction extends Reducer {
1412

1513
private int sum = 0;
1614

17-
public SumFunction(String key, Metadata metadata) {
18-
super(key, metadata);
19-
}
20-
21-
public static void main(String[] args) throws IOException {
22-
log.info("counter udf was invoked");
23-
new FunctionServer().registerReducer(SumFunction.class).start();
24-
}
25-
2615
@Override
27-
public void addMessage(Datum datum) {
16+
public void addMessage(String key, Datum datum, Metadata md) {
2817
try {
2918
sum += Integer.parseInt(new String(datum.getValue()));
3019
} catch (NumberFormatException e) {
@@ -33,7 +22,7 @@ public void addMessage(Datum datum) {
3322
}
3423

3524
@Override
36-
public Message[] getOutput() {
25+
public Message[] getOutput(String key, Metadata md) {
3726
return new Message[]{Message.toAll(String.valueOf(sum).getBytes())};
3827
}
3928
}

src/main/java/io/numaproj/numaflow/function/FunctionServer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import io.numaproj.numaflow.function.map.MapHandler;
1717
import io.numaproj.numaflow.function.mapt.MapTHandler;
1818
import io.numaproj.numaflow.function.reduce.Reducer;
19+
import io.numaproj.numaflow.function.reduce.ReducerFactory;
1920
import lombok.extern.slf4j.Slf4j;
2021

2122
import java.io.IOException;
@@ -69,8 +70,8 @@ public FunctionServer registerMapperT(MapTHandler mapTHandler) {
6970
return this;
7071
}
7172

72-
public FunctionServer registerReducer(Class<? extends Reducer> groupByClass) {
73-
this.functionService.setReduceHandler(groupByClass);
73+
public FunctionServer registerReducerFactory(ReducerFactory<? extends Reducer> reducerFactory) {
74+
this.functionService.setReduceHandler(reducerFactory);
7475
return this;
7576
}
7677

0 commit comments

Comments
 (0)