Skip to content

Commit f05ce60

Browse files
authored
Merge pull request #9 from ywy2090/feature-milestone2-datasource
support mysql/dm/gauss/king/shentong database data source
2 parents 0591993 + 9672f7d commit f05ce60

28 files changed

+1030
-271
lines changed

wedpr-adm/conf/application-wedpr.properties

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@ wedpr.dataset.dataSourceType[4].children[0].label=MYSQL
5454
wedpr.dataset.dataSourceType[4].children[0].value=MYSQL
5555
wedpr.dataset.dataSourceType[4].children[1].label=\u8FBE\u68A6
5656
wedpr.dataset.dataSourceType[4].children[1].value=DM
57-
wedpr.dataset.dataSourceType[4].children[2].label=POSTFRESQL
58-
wedpr.dataset.dataSourceType[4].children[2].value=POSTFRESQL
57+
wedpr.dataset.dataSourceType[4].children[2].label=POSTGRESQL
58+
wedpr.dataset.dataSourceType[4].children[2].value=POSTGRESQL
5959
wedpr.dataset.dataSourceType[4].children[3].label=\u9AD8\u65AF
6060
wedpr.dataset.dataSourceType[4].children[3].value=GAUSS
6161
wedpr.dataset.dataSourceType[4].children[4].label=\u4EBA\u5927\u91D1\u4ED3

wedpr-components/dataset/build.gradle

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,40 @@ plugins {
44
id 'com.github.sherter.google-java-format'
55
}
66

7+
// Additional attribute definition
8+
ext {
9+
dmVersion = "8.1.2.192"
10+
postgresqlVersion = "42.5.6"
11+
kingVersion = "8.6.0"
12+
}
13+
714
dependencies {
815
compile project(":wedpr-core-utils")
916
compile project(":wedpr-components-uuid")
1017
compile project(":wedpr-components-mybatis")
1118
compile project(":wedpr-components-storage")
1219
compile project(":wedpr-components-authorization")
13-
// compile project(":wedpr-components-initializer")
1420
compile project(":wedpr-components-token-auth")
1521

1622
compile "org.apache.poi:poi:${poiVersion}"
1723
compile "org.apache.poi:poi-excelant:${poiVersion}"
1824
compile "com.opencsv:opencsv:${opencsvVersion}"
25+
26+
// da meng
27+
compile "com.dameng:DmJdbcDriver18:${dmVersion}"
28+
// compile "com.dameng:DmDialect-for-hibernate5.0:${dmVersion}"
29+
// compile 'org.hibernate:hibernate-core:5.3.20.Final'
30+
31+
// gauss
32+
compile "org.postgresql:postgresql:${postgresqlVersion}"
33+
34+
// king
35+
compile "cn.com.kingbase:kingbase8:${kingVersion}"
36+
// compile files('libs/hibernate-4.dialect.jar')
37+
38+
// shen tong
39+
compile files('libs/oscarJDBC8.jar')
40+
// compile files('libs/oscarHibernate54.jar')
1941
}
2042

2143
googleJavaFormat {
3.04 MB
Binary file not shown.

wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/common/DatasetStatus.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@ public enum DatasetStatus {
99
Success(0, "Success"),
1010
Failure(-1, "Failure"),
1111
Fatal(-2, "Fatal"),
12-
WaitingForUploadData(1, "WaitingForUploadData"),
13-
DataUploading(2, "DataUploading"),
14-
DataAnalyzing(3, "DataAnalyzing"),
15-
UploadingDataToStorage(4, "UploadingDataToStorage");
12+
InitialState(1, "InitialState"),
13+
WaitingForUploadData(2, "WaitingForUploadData"),
14+
WaitingForLoadDBData(3, "WaitingForLoadDBData"),
15+
DataUploading(4, "DataUploading"),
16+
DataAnalyzing(5, "DataAnalyzing"),
17+
UploadingDataToStorage(6, "UploadingDataToStorage");
1618

1719
private final Integer code;
1820
private final String message;

wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/config/DataSourceTypeConfig.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
import java.util.List;
1010
import lombok.Data;
1111
import lombok.Getter;
12+
import org.slf4j.Logger;
13+
import org.slf4j.LoggerFactory;
1214
import org.springframework.boot.context.properties.ConfigurationProperties;
1315
import org.springframework.context.annotation.Bean;
1416
import org.springframework.context.annotation.Configuration;
@@ -18,6 +20,8 @@
1820
@Getter
1921
public class DataSourceTypeConfig {
2022

23+
private static final Logger logger = LoggerFactory.getLogger(DataSourceTypeConfig.class);
24+
2125
@Data
2226
@JsonInclude(JsonInclude.Include.NON_NULL)
2327
public static class LabelValue {
@@ -36,7 +40,7 @@ public void validateDataSourceConfigurations() throws DatasetException {
3640

3741
String label = labelValue.getLabel();
3842
String value = labelValue.getValue();
39-
// boolean supportDynamicType = labelValue.isSupportDynamicType();
43+
4044
Common.requireNonEmpty("label", label);
4145
Common.requireNonEmpty("value", value);
4246

@@ -48,7 +52,8 @@ public void validateDataSourceConfigurations() throws DatasetException {
4852

4953
// check db types
5054
for (LabelValue childLabelValue : labelValue.getChildren()) {
51-
DBType.isSupportedDBType(childLabelValue.getValue());
55+
DBType dbType = DBType.fromStrType(childLabelValue.getValue());
56+
logger.info(" add one db type: {}", dbType);
5257
}
5358
}
5459
}

wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/config/DatasetConfig.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
*/
1616
package com.webank.wedpr.components.dataset.config;
1717

18+
import static com.webank.wedpr.components.dataset.service.ChunkUploadImpl.UPLOAD_CHUNK_FILE_NAME_PREFIX;
19+
20+
import com.webank.wedpr.components.dataset.common.DatasetConstant;
1821
import lombok.Data;
1922
import org.springframework.beans.factory.annotation.Value;
2023
import org.springframework.context.annotation.Configuration;
@@ -42,4 +45,27 @@ public class DatasetConfig {
4245

4346
@Value("${wedpr.storage.download.shardSize: 20971520}")
4447
int shardSize;
48+
49+
// ${largeFileDataDir}/dataset/
50+
public String getDatasetBaseDir() {
51+
return String.format("%s/%s", largeFileDataDir, DatasetConstant.DATASET_LABEL);
52+
}
53+
54+
// ${largeFileDataDir}/dataset/${datasetId}
55+
public String getDatasetDir(String datasetId) {
56+
String datasetBaseDir = getDatasetBaseDir();
57+
return String.format("%s/%s", datasetBaseDir, datasetId);
58+
}
59+
60+
// ${largeFileDataDir}/dataset/chunks/
61+
public String getDatasetChunksBaseDir() {
62+
String datasetBaseDir = getDatasetBaseDir();
63+
return String.format("%s/%s", datasetBaseDir, UPLOAD_CHUNK_FILE_NAME_PREFIX);
64+
}
65+
66+
// ${largeFileDataDir}/dataset/chunks/${datasetId}
67+
public String getDatasetChunksDir(String datasetId) {
68+
String datasetChunksBaseDir = getDatasetChunksBaseDir();
69+
return String.format("%s/%s", datasetChunksBaseDir, datasetId);
70+
}
4571
}

wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/config/DatasetInitializer.java

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,30 @@ public class DatasetInitializer {
1919
@Bean
2020
public void createLargeFileDataDir() throws IOException {
2121
String largeFileDataDir = datasetConfig.getLargeFileDataDir();
22+
String datasetBaseDir = datasetConfig.getDatasetBaseDir();
2223

23-
File file = new File(largeFileDataDir);
24-
if (!file.exists()) {
25-
Files.createDirectories(file.toPath());
26-
logger.info(" => create large file data dir, largeFileDataDir: {}", largeFileDataDir);
27-
} else {
28-
logger.info(
29-
" => large file data dir has been exist, largeFileDataDir: {}",
30-
largeFileDataDir);
24+
{
25+
File file = new File(largeFileDataDir);
26+
if (!file.exists()) {
27+
Files.createDirectories(file.toPath());
28+
logger.info(
29+
" => create large file data dir, largeFileDataDir: {}", largeFileDataDir);
30+
} else {
31+
logger.info(
32+
" => large file data dir has been exist, largeFileDataDir: {}",
33+
largeFileDataDir);
34+
}
35+
}
36+
37+
{
38+
File file = new File(datasetBaseDir);
39+
if (!file.exists()) {
40+
Files.createDirectories(file.toPath());
41+
logger.info(" => create dataset base dir, datasetBaseDir: {}", datasetBaseDir);
42+
} else {
43+
logger.info(
44+
" => dataset base dir has been exist, datasetBaseDir: {}", datasetBaseDir);
45+
}
3146
}
3247
}
3348
}

wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/controller/ChunkUploadController.java

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,15 @@
66
import com.webank.wedpr.components.dataset.dao.Dataset;
77
import com.webank.wedpr.components.dataset.dao.FileChunk;
88
import com.webank.wedpr.components.dataset.dao.UserInfo;
9+
import com.webank.wedpr.components.dataset.datasource.DataSourceType;
910
import com.webank.wedpr.components.dataset.datasource.category.UploadChunkDataSource;
1011
import com.webank.wedpr.components.dataset.datasource.dispatch.DataSourceProcessorDispatcher;
1112
import com.webank.wedpr.components.dataset.datasource.processor.DataSourceProcessor;
1213
import com.webank.wedpr.components.dataset.datasource.processor.DataSourceProcessorContext;
1314
import com.webank.wedpr.components.dataset.exception.DatasetException;
1415
import com.webank.wedpr.components.dataset.mapper.DatasetMapper;
1516
import com.webank.wedpr.components.dataset.mapper.wapper.DatasetTransactionalWrapper;
17+
import com.webank.wedpr.components.dataset.mapper.wapper.DatasetWrapper;
1618
import com.webank.wedpr.components.dataset.message.MergeChunkRequest;
1719
import com.webank.wedpr.components.dataset.service.ChunkUploadApi;
1820
import com.webank.wedpr.components.dataset.sync.api.DatasetSyncerApi;
@@ -63,6 +65,8 @@ public class ChunkUploadController {
6365

6466
@Autowired private DatasetTransactionalWrapper datasetTransactionalWrapper;
6567

68+
@Autowired private DatasetWrapper datasetWrapper;
69+
6670
@Autowired
6771
@Qualifier("datasetAsyncExecutor")
6872
private Executor executor;
@@ -99,25 +103,25 @@ public WeDPRResponse uploadChunkData(
99103
}
100104

101105
// get DataSourceProcessor for the dataset
102-
String dataSourceType = dataset.getDataSourceType();
106+
String strDataSourceType = dataset.getDataSourceType();
103107
DataSourceProcessor dataSourceProcessor =
104-
dataSourceProcessorDispatcher.getDataSourceProcessor(dataSourceType);
108+
dataSourceProcessorDispatcher.getDataSourceProcessor(strDataSourceType);
105109
if (dataSourceProcessor == null) {
106110
logger.error(
107111
"Unsupported data source type, datasetId: {}, dataSourceType: {}",
108112
datasetId,
109-
dataSourceType);
110-
throw new DatasetException("Unsupported data source type: " + dataSourceType);
113+
strDataSourceType);
114+
throw new DatasetException("Unsupported data source type: " + strDataSourceType);
111115
}
112116

113-
if (!dataSourceProcessor.isSupportUploadChunkData()) {
117+
if (!DataSourceType.isUploadDataSource(strDataSourceType)) {
114118
logger.error(
115119
"the data source does not support chunks upload, datasetId: {}, dataSourceType: {}",
116120
datasetId,
117-
dataSourceType);
121+
strDataSourceType);
118122
throw new DatasetException(
119123
"the data source does not support chunks upload, data source type: "
120-
+ dataSourceType);
124+
+ strDataSourceType);
121125
}
122126

123127
int status = dataset.getStatus();
@@ -202,25 +206,25 @@ public WeDPRResponse mergeChunkData(
202206
}
203207

204208
// get DataSourceProcessor for the dataset
205-
String dataSourceType = dataset.getDataSourceType();
209+
String strDataSourceType = dataset.getDataSourceType();
206210
DataSourceProcessor dataSourceProcessor =
207-
dataSourceProcessorDispatcher.getDataSourceProcessor(dataSourceType);
211+
dataSourceProcessorDispatcher.getDataSourceProcessor(strDataSourceType);
208212
if (dataSourceProcessor == null) {
209213
logger.error(
210214
"Unsupported data source type, datasetId: {}, dataSourceType: {}",
211215
datasetId,
212-
dataSourceType);
213-
throw new DatasetException("Unsupported data source type: " + dataSourceType);
216+
strDataSourceType);
217+
throw new DatasetException("Unsupported data source type: " + strDataSourceType);
214218
}
215219

216-
if (!dataSourceProcessor.isSupportUploadChunkData()) {
220+
if (!DataSourceType.isUploadDataSource(strDataSourceType)) {
217221
logger.error(
218222
"the data source does not support chunks upload, datasetId: {}, dataSourceType: {}",
219223
datasetId,
220-
dataSourceType);
224+
strDataSourceType);
221225
throw new DatasetException(
222226
"the data source does not support chunks upload, data source type: "
223-
+ dataSourceType);
227+
+ strDataSourceType);
224228
}
225229

226230
// transfer mergeChunkRequest to UploadChunkDataSource
@@ -247,12 +251,22 @@ public WeDPRResponse mergeChunkData(
247251
.fileStorage(fileStorage)
248252
.build();
249253

250-
dataSourceProcessor.processData(context);
254+
try {
255+
dataSourceProcessor.setContext(context);
256+
dataSourceProcessor.processData(context);
257+
dataset.setStatus(DatasetStatus.Success.getCode());
258+
dataset.setStatusDesc(DatasetStatus.Success.getMessage());
251259

252-
boolean success = context.isSuccess();
253-
if (success) {
254-
// sync to others
255260
datasetSyncer.syncCreateDataset(userInfo, dataset);
261+
} catch (Exception e) {
262+
dataset.setStatus(DatasetStatus.Failure.getCode());
263+
dataset.setStatusDesc(
264+
DatasetStatus.Failure.getMessage() + ":" + e.getMessage());
265+
} finally {
266+
try {
267+
datasetWrapper.updateMeta2DB(dataset);
268+
} catch (DatasetException ignore) {
269+
}
256270
}
257271
});
258272

wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/datasource/DBType.java

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,39 @@
11
package com.webank.wedpr.components.dataset.datasource;
22

33
import com.webank.wedpr.components.dataset.exception.DatasetException;
4+
import lombok.Getter;
45

56
// supported database types
7+
@Getter
68
public enum DBType {
7-
MYSQL,
8-
DM,
9-
GAUSS, //
10-
KING, // KING_BASE,
11-
SHENTONG,
12-
POSTFRESQL;
9+
// jdbc:mysql://127.0.0.1:3306/[#databaseSchema]?serverTimezone=GMT%2B8&characterEncoding=UTF-8&connectTimeout=60000&socketTimeout=60000
10+
MYSQL("mysql", "mysql"),
11+
DM("dm", "dm"), // jdbc:dm://db_ip:db_port?genKeyNameCase=0
12+
GAUSS("gauss", "postgresql"), // jdbc:postgresql://db_ip:db_port/db_name
13+
KING("king", "kingbase8"), // KING_BASE, jdbc:kingbase8://db_ip:db_port/db_name
14+
SHENTONG("shentong", "oscar"), // jdbc:oscar://db_ip:db_port/db_name
15+
POSTGRESQL("postgresql", "postgresql"); // jdbc:postgresql://db_ip:db_port/db_name
1316
// NOTE: Add more db type
1417

15-
public static void isSupportedDBType(String strType) throws DatasetException {
18+
private final String type;
19+
private final String jdbcProtocol;
20+
21+
DBType(String type, String jdbcProtocol) {
22+
this.type = type;
23+
this.jdbcProtocol = jdbcProtocol;
24+
}
25+
26+
@Override
27+
public String toString() {
28+
return "DBType{" + "type='" + type + '\'' + ", jdbcProtocol='" + jdbcProtocol + '\'' + '}';
29+
}
30+
31+
public static DBType fromStrType(String strType) throws DatasetException {
1632
DBType[] values = DBType.values();
1733
for (DBType dbType : values) {
18-
String name = dbType.name();
19-
if (name.equalsIgnoreCase(strType)) {
20-
return;
34+
String type = dbType.getType();
35+
if (type.equalsIgnoreCase(strType)) {
36+
return dbType;
2137
}
2238
}
2339

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
11
package com.webank.wedpr.components.dataset.datasource;
22

3-
public interface DataSourceMeta {}
3+
public interface DataSourceMeta {
4+
// dynamic data source or not
5+
default boolean dynamicDataSource() {
6+
return false;
7+
}
8+
}

0 commit comments

Comments
 (0)