Skip to content

Commit ec03659

Browse files
authored
Merge pull request #157 from ywy2090/feature-milestone2-query-bug-fx
support mpc with psi
2 parents aaab545 + 08a00a2 commit ec03659

File tree

16 files changed

+405
-194
lines changed

16 files changed

+405
-194
lines changed

wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/DagWorkFlowSchedulerImpl.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,16 @@
33
// import static com.webank.wedpr.components.scheduler.dag.utils.WorkerUtils.toJobWorker;
44

55
import com.webank.wedpr.common.utils.WeDPRException;
6+
import com.webank.wedpr.components.db.mapper.dataset.mapper.DatasetMapper;
67
import com.webank.wedpr.components.loadbalancer.LoadBalancer;
8+
import com.webank.wedpr.components.project.dao.JobDO;
79
import com.webank.wedpr.components.scheduler.dag.api.WorkFlowScheduler;
810
import com.webank.wedpr.components.scheduler.dag.base.DAG;
911
import com.webank.wedpr.components.scheduler.dag.base.DAGNode;
1012
import com.webank.wedpr.components.scheduler.dag.entity.JobWorker;
1113
import com.webank.wedpr.components.scheduler.dag.utils.WorkerUtils;
1214
import com.webank.wedpr.components.scheduler.dag.worker.*;
15+
import com.webank.wedpr.components.scheduler.executor.impl.model.FileMetaBuilder;
1316
import com.webank.wedpr.components.scheduler.mapper.JobWorkerMapper;
1417
import com.webank.wedpr.components.scheduler.workflow.WorkFlow;
1518
import com.webank.wedpr.components.scheduler.workflow.WorkFlowNode;
@@ -28,19 +31,24 @@ public class DagWorkFlowSchedulerImpl implements WorkFlowScheduler {
2831

2932
private LoadBalancer loadBalancer;
3033
private JobWorkerMapper jobWorkerMapper;
34+
private DatasetMapper datasetMapper;
3135
private FileStorageInterface fileStorageInterface;
36+
private FileMetaBuilder fileMetaBuilder;
3237

33-
// TODO: make it config items
3438
private final Integer workerRetryTimes = -1;
3539
private final Integer workerRetryDelayMillis = -1;
3640

3741
public DagWorkFlowSchedulerImpl(
3842
LoadBalancer loadBalancer,
3943
JobWorkerMapper jobWorkerMapper,
40-
FileStorageInterface storage) {
44+
DatasetMapper datasetMapper,
45+
FileStorageInterface fileStorageInterface,
46+
FileMetaBuilder fileMetaBuilder) {
4147
this.loadBalancer = loadBalancer;
4248
this.jobWorkerMapper = jobWorkerMapper;
43-
this.fileStorageInterface = storage;
49+
this.datasetMapper = datasetMapper;
50+
this.fileStorageInterface = fileStorageInterface;
51+
this.fileMetaBuilder = fileMetaBuilder;
4452
}
4553

4654
public LoadBalancer getLoadBalancer() {
@@ -81,10 +89,10 @@ public void saveOrUpdateJobWorker(JobWorker jobWorker) {
8189
}
8290

8391
@Override
84-
public void schedule(String jobId, WorkFlow workFlow) throws WeDPRException {
92+
public void schedule(String jobId, JobDO jobDO, WorkFlow workFlow) throws WeDPRException {
8593

8694
// prepare for dag
87-
List<Worker> workerList = prepareDag(jobId, workFlow);
95+
List<Worker> workerList = prepareDag(jobId, jobDO, workFlow);
8896

8997
// construct dag
9098
DAG<Integer> dag = createDag(jobId, workerList);
@@ -93,7 +101,8 @@ public void schedule(String jobId, WorkFlow workFlow) throws WeDPRException {
93101
executeDag(jobId, workerList, dag);
94102
}
95103

96-
public List<Worker> prepareDag(String jobId, WorkFlow workFlow) throws WeDPRException {
104+
public List<Worker> prepareDag(String jobId, JobDO jobDO, WorkFlow workFlow)
105+
throws WeDPRException {
97106

98107
List<Worker> workerList = new ArrayList<>();
99108

@@ -111,12 +120,15 @@ public List<Worker> prepareDag(String jobId, WorkFlow workFlow) throws WeDPRExce
111120
// build worker
112121
Worker worker =
113122
WorkerFactory.buildWorker(
123+
jobDO,
114124
jobWorker,
115125
workerRetryTimes,
116126
workerRetryDelayMillis,
117127
loadBalancer,
118128
jobWorkerMapper,
119-
fileStorageInterface);
129+
datasetMapper,
130+
fileStorageInterface,
131+
fileMetaBuilder);
120132
workerList.add(worker);
121133
}
122134

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
package com.webank.wedpr.components.scheduler.dag.api;
22

33
import com.webank.wedpr.common.utils.WeDPRException;
4+
import com.webank.wedpr.components.project.dao.JobDO;
45
import com.webank.wedpr.components.scheduler.workflow.WorkFlow;
56

67
public interface WorkFlowScheduler {
7-
void schedule(String jobId, WorkFlow workFlow) throws WeDPRException;
8+
void schedule(String jobId, JobDO jobDO, WorkFlow workFlow) throws WeDPRException;
89
}

wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/utils/MpcResultFileResolver.java

Lines changed: 33 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.webank.wedpr.components.scheduler.dag.utils;
22

3+
import com.webank.wedpr.common.utils.WeDPRException;
34
import java.io.BufferedReader;
45
import java.io.FileReader;
56
import java.io.FileWriter;
@@ -31,23 +32,24 @@ public class MpcResultFileResolver {
3132

3233
public static final String CSV_SEP = ",";
3334
public static final String BLANK_SEP = " ";
35+
public static final String MPC_ID_FIELD = "id";
3436

3537
public MpcResult doParseMpcResultFile(String mpcOutputFile, boolean onlyField)
36-
throws IOException {
38+
throws IOException, WeDPRException {
3739

38-
int valueCount = 0;
40+
int mpcResultRowCount = 0;
41+
int mpcResultFieldCount = 0;
3942

40-
int fieldCount = 0;
4143
boolean needAddFields = true;
4244
MpcResult mpcResult = new MpcResult();
43-
String strResultFields = "id";
45+
String strResultFields = MPC_ID_FIELD;
4446
try (BufferedReader mpcOutputBufferedReader =
4547
new BufferedReader(new FileReader(mpcOutputFile))) {
4648
String line;
4749
while ((line = mpcOutputBufferedReader.readLine()) != null) {
4850
line = line.trim();
4951
if (line.startsWith(PPC_RESULT_FIELDS_FLAG)) {
50-
strResultFields += ",";
52+
strResultFields += CSV_SEP;
5153
strResultFields +=
5254
line.substring(line.indexOf('=') + 1)
5355
.trim()
@@ -56,13 +58,13 @@ public MpcResult doParseMpcResultFile(String mpcOutputFile, boolean onlyField)
5658
mpcResult.setMpcResultFields(strResultFields);
5759
needAddFields = false;
5860
} else if (line.startsWith(PPC_RESULT_VALUES_FLAG)) {
59-
fieldCount = line.split("=")[1].trim().split(BLANK_SEP).length;
60-
logger.info("## {}:{}", PPC_RESULT_VALUES_FLAG, fieldCount);
61-
mpcResult.setMpcResultFieldCount(fieldCount);
61+
mpcResultFieldCount = line.split("=")[1].trim().split(BLANK_SEP).length;
62+
logger.info("## {}:{}", PPC_RESULT_VALUES_FLAG, mpcResultFieldCount);
63+
mpcResult.setMpcResultFieldCount(mpcResultFieldCount);
6264
if (onlyField) {
6365
break;
6466
}
65-
valueCount++;
67+
mpcResultRowCount++;
6668
} else if (line.startsWith(PPC_RESULT_TIME_FLAG)) {
6769
logger.info("## {}:{}", PPC_RESULT_TIME_FLAG, line);
6870
mpcResult.setMpcResultTimeLine(line);
@@ -79,19 +81,29 @@ public MpcResult doParseMpcResultFile(String mpcOutputFile, boolean onlyField)
7981
if (needAddFields) {
8082
StringBuilder stringBuilder = new StringBuilder();
8183
stringBuilder.append(strResultFields);
82-
for (int i = 0; i < fieldCount; i++) {
83-
stringBuilder.append("," + "result").append(i);
84+
for (int i = 0; i < mpcResultFieldCount; i++) {
85+
stringBuilder.append(CSV_SEP + "result").append(i);
8486
}
8587
mpcResult.setMpcResultFields(stringBuilder.toString());
88+
89+
logger.error(
90+
"Not found \"{}\" flag in mpc result file, fields: {}, mpcOutputFile: {}",
91+
PPC_RESULT_FIELDS_FLAG,
92+
stringBuilder,
93+
mpcOutputFile);
94+
95+
throw new WeDPRException(
96+
"Not found \"" + PPC_RESULT_FIELDS_FLAG + "\" flag in mpc result file");
8697
}
8798

88-
mpcResult.setMpcResultValueCount(valueCount);
89-
logger.info("mpc result: {}", mpcResult);
99+
mpcResult.setMpcResultValueCount(mpcResultRowCount);
100+
logger.info("## mpc result: {}", mpcResult);
90101
return mpcResult;
91102
}
92103

93104
public void transMpcOutputFile2ResultFile(
94-
String jobId, String mpcOutputFile, String mpcResultFile) throws IOException {
105+
String jobId, String mpcOutputFile, String mpcResultFile)
106+
throws IOException, WeDPRException {
95107

96108
long startTimeMillis = System.currentTimeMillis();
97109
logger.info(
@@ -102,7 +114,7 @@ public void transMpcOutputFile2ResultFile(
102114

103115
MpcResult mpcResult = doParseMpcResultFile(mpcOutputFile, true);
104116

105-
int rowCount = 0;
117+
int rowNumber = 0;
106118
try (BufferedReader mpcOutputBufferedReader =
107119
new BufferedReader(new FileReader(mpcOutputFile));
108120
FileWriter mpcResultFileWriter = new FileWriter(mpcResultFile);
@@ -124,19 +136,19 @@ public void transMpcOutputFile2ResultFile(
124136
continue;
125137
}
126138

127-
rowCount++;
139+
rowNumber++;
128140

129-
StringBuilder stringBuilder = new StringBuilder(String.valueOf(rowCount));
141+
StringBuilder stringBuilder = new StringBuilder(String.valueOf(rowNumber));
130142

131143
String[] valuesList = line.split("=")[1].trim().split(BLANK_SEP);
132144
for (String value : valuesList) {
133-
stringBuilder.append(",").append(value);
145+
stringBuilder.append(CSV_SEP).append(value);
134146
}
135147

136148
csvWriter.write(stringBuilder.toString());
137149

138150
if (logger.isTraceEnabled()) {
139-
logger.trace("result values: {}, index: {}", stringBuilder, rowCount);
151+
logger.trace("result values: {}, index: {}", stringBuilder, rowNumber);
140152
}
141153

142154
// add a newline at the end of each row
@@ -146,25 +158,12 @@ public void transMpcOutputFile2ResultFile(
146158
long endTimeMillis = System.currentTimeMillis();
147159

148160
logger.info(
149-
"trans mpc output file to mpc result file end, jobId: {}, mpcOutputFile: {}, mpcResultFile: {}, rowCount: {}, costMs: {}",
161+
"trans mpc output file to mpc result file end, jobId: {}, mpcOutputFile: {}, mpcResultFile: {}, rowNumber: {}, costMs: {}",
150162
jobId,
151163
mpcOutputFile,
152164
mpcResultFile,
153-
rowCount,
165+
rowNumber,
154166
endTimeMillis - startTimeMillis);
155167
}
156168
}
157-
158-
public static void main(String[] args) throws IOException {
159-
MpcResultFileResolver mpcResultFileResolver = new MpcResultFileResolver();
160-
// MpcResult mpcResult =
161-
// mpcResultFileResolver.doParseMpcResultFile(
162-
// "/Users/octopus/Desktop/mpc_result_1.txt", true);
163-
164-
mpcResultFileResolver.transMpcOutputFile2ResultFile(
165-
"",
166-
"/Users/octopus/Desktop/mpc_result_1.txt",
167-
"/Users/octopus/Desktop/mpc_result.csv");
168-
// System.out.println(mpcResult);
169-
}
170169
}

wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/worker/ModelWorker.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@
33
import com.webank.wedpr.common.config.WeDPRCommonConfig;
44
import com.webank.wedpr.common.protocol.ServiceName;
55
import com.webank.wedpr.common.utils.WeDPRException;
6+
import com.webank.wedpr.components.db.mapper.dataset.mapper.DatasetMapper;
67
import com.webank.wedpr.components.loadbalancer.LoadBalancer;
8+
import com.webank.wedpr.components.project.dao.JobDO;
79
import com.webank.wedpr.components.scheduler.client.ModelClient;
810
import com.webank.wedpr.components.scheduler.dag.entity.JobWorker;
11+
import com.webank.wedpr.components.scheduler.executor.impl.model.FileMetaBuilder;
912
import com.webank.wedpr.components.scheduler.mapper.JobWorkerMapper;
1013
import com.webank.wedpr.components.storage.api.FileStorageInterface;
1114
import com.webank.wedpr.sdk.jni.transport.model.ServiceMeta;
@@ -17,19 +20,25 @@ public class ModelWorker extends Worker {
1720
private static final Logger logger = LoggerFactory.getLogger(ModelWorker.class);
1821

1922
public ModelWorker(
23+
JobDO jobDO,
2024
JobWorker jobWorker,
2125
int workerRetryTimes,
2226
int workerRetryDelayMillis,
2327
LoadBalancer loadBalancer,
2428
JobWorkerMapper jobWorkerMapper,
25-
FileStorageInterface fileStorageInterface) {
29+
DatasetMapper datasetMapper,
30+
FileStorageInterface fileStorageInterface,
31+
FileMetaBuilder fileMetaBuilder) {
2632
super(
33+
jobDO,
2734
jobWorker,
2835
workerRetryTimes,
2936
workerRetryDelayMillis,
3037
loadBalancer,
3138
jobWorkerMapper,
32-
fileStorageInterface);
39+
datasetMapper,
40+
fileStorageInterface,
41+
fileMetaBuilder);
3342
}
3443

3544
@Override

wedpr-components/scheduler/src/main/java/com/webank/wedpr/components/scheduler/dag/worker/MpcWorker.java

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,16 @@
44
import com.webank.wedpr.common.utils.Common;
55
import com.webank.wedpr.common.utils.ObjectMapperFactory;
66
import com.webank.wedpr.common.utils.WeDPRException;
7+
import com.webank.wedpr.components.db.mapper.dataset.mapper.DatasetMapper;
78
import com.webank.wedpr.components.loadbalancer.LoadBalancer;
9+
import com.webank.wedpr.components.project.dao.JobDO;
810
import com.webank.wedpr.components.scheduler.client.MpcClient;
911
import com.webank.wedpr.components.scheduler.dag.entity.JobWorker;
1012
import com.webank.wedpr.components.scheduler.dag.utils.MpcResultFileResolver;
13+
import com.webank.wedpr.components.scheduler.executor.hook.MPCExecutorHook;
1114
import com.webank.wedpr.components.scheduler.executor.impl.ExecutorConfig;
15+
import com.webank.wedpr.components.scheduler.executor.impl.model.FileMetaBuilder;
16+
import com.webank.wedpr.components.scheduler.executor.impl.mpc.MPCJobParam;
1217
import com.webank.wedpr.components.scheduler.executor.impl.mpc.request.MpcRunJobRequest;
1318
import com.webank.wedpr.components.scheduler.mapper.JobWorkerMapper;
1419
import com.webank.wedpr.components.storage.api.FileStorageInterface;
@@ -24,19 +29,56 @@ public class MpcWorker extends Worker {
2429
private static final Logger logger = LoggerFactory.getLogger(MpcWorker.class);
2530

2631
public MpcWorker(
32+
JobDO jobDO,
2733
JobWorker jobWorker,
2834
int workerRetryTimes,
2935
int workerRetryDelayMillis,
3036
LoadBalancer loadBalancer,
3137
JobWorkerMapper jobWorkerMapper,
32-
FileStorageInterface fileStorageInterface) {
38+
DatasetMapper datasetMapper,
39+
FileStorageInterface fileStorageInterface,
40+
FileMetaBuilder fileMetaBuilder) {
3341
super(
42+
jobDO,
3443
jobWorker,
3544
workerRetryTimes,
3645
workerRetryDelayMillis,
3746
loadBalancer,
3847
jobWorkerMapper,
39-
fileStorageInterface);
48+
datasetMapper,
49+
fileStorageInterface,
50+
fileMetaBuilder);
51+
}
52+
53+
@SneakyThrows
54+
@Override
55+
public void onLaunch() {
56+
super.onLaunch();
57+
58+
long startTimeMillis = System.currentTimeMillis();
59+
JobDO jobDO = getJobDO();
60+
MPCJobParam mpcJobParam = (MPCJobParam) getJobDO().getJobParam();
61+
62+
logger.info(
63+
"## mpc engine launch begin, jobId: {}, mpcJobParams: {}",
64+
jobDO.getId(),
65+
mpcJobParam);
66+
67+
MPCExecutorHook mpcExecutorHook =
68+
new MPCExecutorHook(
69+
getDatasetMapper(), getFileStorageInterface(), getFileMetaBuilder());
70+
boolean needRunPsi = mpcJobParam.isNeedRunPsi();
71+
if (needRunPsi) {
72+
mpcExecutorHook.prepareWithPsi(getDatasetMapper(), jobDO, mpcJobParam);
73+
} else {
74+
mpcExecutorHook.prepareWithoutPsi(getDatasetMapper(), jobDO, mpcJobParam);
75+
}
76+
77+
long endTimeMillis = System.currentTimeMillis();
78+
logger.info(
79+
"## mpc engine launch end, jobId: {}, costMs: {}",
80+
jobDO.getId(),
81+
endTimeMillis - startTimeMillis);
4082
}
4183

4284
@Override

0 commit comments

Comments
 (0)