Skip to content

Commit 9fc773a

Browse files
authored
Merge pull request #43 from zachma-820/feature-milestone2
添加PIR的向导模式
2 parents 9826dc3 + 7b05125 commit 9fc773a

File tree

14 files changed

+263
-130
lines changed

14 files changed

+263
-130
lines changed

wedpr-adm/conf/wedpr.properties

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ wedpr.executor.ml.connect.timeout.ms=5000
6161
wedpr.executor.ml.request.timeout.ms=60000
6262
wedpr.executor.ml.max.total.connection=5
6363

64-
6564
wedpr.executor.psi.url=
6665
wedpr.executor.psi.connect.request.timeout.ms=10000
6766
wedpr.executor.psi.connect.timeout.ms=5000
@@ -70,7 +69,11 @@ wedpr.executor.psi.max.total.connection=5
7069

7170
wedpr.executor.psi.token=
7271

73-
72+
wedpr.executor.pir.url=
73+
wedpr.executor.pir.connect.request.timeout.ms=10000
74+
wedpr.executor.pir.connect.timeout.ms=5000
75+
wedpr.executor.pir.request.timeout.ms=60000
76+
wedpr.executor.pir.max.total.connection=5
7477

7578

7679

wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/datasource/dispatch/DataSourceProcessorDispatcher.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,9 @@ public DataSourceProcessor getDataSourceProcessor(String dataSourceType) {
5454
Class<?> aClass = dataSourcePreprocessorMap.get(upperCase);
5555
DataSourceProcessor dataSourceProcessor = null;
5656
try {
57-
dataSourceProcessor = (DataSourceProcessor) aClass.newInstance();
57+
// dataSourceProcessor = (DataSourceProcessor) aClass.newInstance();
58+
dataSourceProcessor =
59+
(DataSourceProcessor) aClass.getDeclaredConstructor().newInstance();
5860
} catch (Exception e) {
5961
logger.error("newInstance Exception, dataSourceType: {}, e: ", dataSourceType, e);
6062
throw new RuntimeException(e);
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package com.webank.wedpr.components.scheduler.executor.impl.pir;
2+
3+
import com.webank.wedpr.components.scheduler.executor.impl.pir.request.PirRequest;
4+
import com.webank.wedpr.components.scheduler.service.PirService;
5+
import com.webank.wedpr.components.token.auth.TokenUtils;
6+
import com.webank.wedpr.core.utils.Constant;
7+
import com.webank.wedpr.core.utils.WeDPRResponse;
8+
import javax.servlet.http.HttpServletRequest;
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
11+
import org.springframework.beans.factory.annotation.Autowired;
12+
import org.springframework.web.bind.annotation.PostMapping;
13+
import org.springframework.web.bind.annotation.RequestBody;
14+
import org.springframework.web.bind.annotation.RequestMapping;
15+
import org.springframework.web.bind.annotation.RestController;
16+
17+
/**
18+
* @author zachma
19+
* @date 2024/9/5
20+
*/
21+
@RestController
22+
@RequestMapping(
23+
path = Constant.WEDPR_API_PREFIX + "/project/pir/",
24+
produces = {"application/json"})
25+
public class PirController {
26+
private static final Logger logger = LoggerFactory.getLogger(PirController.class);
27+
28+
@Autowired private PirService pirService;
29+
30+
@PostMapping("/create")
31+
public WeDPRResponse createPirProject(
32+
@RequestBody PirRequest pirRequest, HttpServletRequest request) {
33+
try {
34+
return pirService.createPirProject(
35+
TokenUtils.getLoginUser(request).getUsername(), pirRequest);
36+
} catch (Exception e) {
37+
logger.warn("createProject exception, error: ", e);
38+
return new WeDPRResponse(
39+
Constant.WEDPR_FAILED, "createProject failed for " + e.getMessage());
40+
}
41+
}
42+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package com.webank.wedpr.components.scheduler.executor.impl.pir;
2+
3+
import com.webank.wedpr.components.http.client.HttpClientPool;
4+
import com.webank.wedpr.components.scheduler.executor.impl.pir.request.PirServiceRequest;
5+
import com.webank.wedpr.core.config.WeDPRConfig;
6+
import com.webank.wedpr.core.utils.Constant;
7+
import com.webank.wedpr.core.utils.WeDPRException;
8+
import org.apache.http.client.config.RequestConfig;
9+
import org.apache.http.client.methods.CloseableHttpResponse;
10+
import org.apache.http.client.methods.HttpPost;
11+
import org.apache.http.entity.StringEntity;
12+
import org.apache.http.impl.client.CloseableHttpClient;
13+
import org.apache.http.util.EntityUtils;
14+
15+
/**
16+
* @author zachma
17+
* @date 2024/9/5
18+
*/
19+
public class PirExecutor {
20+
private static final String PIR_URL = WeDPRConfig.apply("wedpr.executor.pir.url", null, true);
21+
22+
private static final Integer CONNECTION_REQUEST_TIME_OUT =
23+
WeDPRConfig.apply("wedpr.executor.pir.connect.request.timeout.ms", 10000);
24+
private static final Integer CONNECTION_TIME_OUT =
25+
WeDPRConfig.apply("wedpr.executor.pir.connect.timeout.ms", 5000);
26+
private static final Integer REQUEST_TIMEOUT =
27+
WeDPRConfig.apply("wedpr.executor.pir.request.timeout.ms", 60000);
28+
private static final Integer MAX_CONNECTION_TOTAL =
29+
WeDPRConfig.apply("wedpr.executor.pir.max.total.connection", 5);
30+
31+
private static final String CONTENT_TYPE_KEY = "Content-Type";
32+
private static final String DEFAULT_CONTENT_TYPE = "application/json";
33+
34+
private RequestConfig buildConfig() {
35+
return RequestConfig.custom()
36+
.setConnectionRequestTimeout(CONNECTION_REQUEST_TIME_OUT)
37+
.setConnectTimeout(CONNECTION_TIME_OUT)
38+
.setSocketTimeout(REQUEST_TIMEOUT)
39+
.build();
40+
}
41+
42+
private String getPirUrl() {
43+
return PIR_URL;
44+
}
45+
46+
public String send(PirServiceRequest pirServiceRequest) throws WeDPRException {
47+
try {
48+
CloseableHttpClient httpClient =
49+
HttpClientPool.getHttpClient(MAX_CONNECTION_TOTAL, buildConfig());
50+
HttpPost httpPost = new HttpPost(getPirUrl());
51+
StringEntity requestEntity = new StringEntity(pirServiceRequest.serialize());
52+
httpPost.setEntity(requestEntity);
53+
httpPost.setHeader(CONTENT_TYPE_KEY, DEFAULT_CONTENT_TYPE);
54+
CloseableHttpResponse response = httpClient.execute(httpPost);
55+
if (response.getStatusLine().getStatusCode() != Constant.HTTP_SUCCESS) {
56+
throw new WeDPRException("任务发起失败");
57+
}
58+
return EntityUtils.toString(response.getEntity());
59+
} catch (Exception e) {
60+
throw new WeDPRException("发起任务失败");
61+
}
62+
}
63+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package com.webank.wedpr.components.scheduler.executor.impl.pir.request;
2+
3+
import lombok.Data;
4+
5+
/**
6+
* @author zachma
7+
* @date 2024/9/5
8+
*/
9+
@Data
10+
public class PirConfigBody {
11+
private String datasetId;
12+
private String[] exists;
13+
private String[] values;
14+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package com.webank.wedpr.components.scheduler.executor.impl.pir.request;
2+
3+
import java.util.List;
4+
import lombok.Data;
5+
6+
/**
7+
* @author zachma
8+
* @date 2024/9/5
9+
*/
10+
@Data
11+
public class PirRequest {
12+
private String datasetId;
13+
private String algorithmType;
14+
private List<String> searchIds;
15+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package com.webank.wedpr.components.scheduler.executor.impl.pir.request;
2+
3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.webank.wedpr.core.utils.ObjectMapperFactory;
5+
import java.util.List;
6+
import lombok.Data;
7+
8+
/**
9+
* @author zachma
10+
* @date 2024/9/5
11+
*/
12+
@Data
13+
public class PirServiceRequest {
14+
private String algorithmType;
15+
private List<String> searchIds;
16+
private PirConfigBody serviceConfigBody;
17+
private String user;
18+
private String agency;
19+
20+
public String serialize() throws JsonProcessingException {
21+
return ObjectMapperFactory.getObjectMapper().writeValueAsString(this);
22+
}
23+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package com.webank.wedpr.components.scheduler.executor.impl.pir.response;
2+
3+
import lombok.Data;
4+
5+
/**
6+
* @author zachma
7+
* @date 2024/9/6
8+
*/
9+
@Data
10+
public class PirResultBody {
11+
String searchId;
12+
Boolean searchExist;
13+
String searchValue;
14+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.webank.wedpr.components.scheduler.executor.impl.pir.response;
2+
3+
import java.util.List;
4+
import lombok.Data;
5+
6+
/**
7+
* @author zachma
8+
* @date 2024/9/6
9+
*/
10+
@Data
11+
public class PirResultDetail {
12+
List<PirResultBody> detail;
13+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.webank.wedpr.components.scheduler.executor.impl.pir.response;
2+
3+
import lombok.Data;
4+
5+
/**
6+
* @author zachma
7+
* @date 2024/9/6
8+
*/
9+
@Data
10+
public class PirResultResponse {
11+
String jobType;
12+
PirResultDetail detail;
13+
}

0 commit comments

Comments
 (0)