Skip to content

Commit 0c34c3c

Browse files
authored
Merge pull request #133 from ywy2090/feature-milestone2
fix db datasource upload bug
2 parents d52b72f + efeed51 commit 0c34c3c

File tree

9 files changed

+146
-5
lines changed

9 files changed

+146
-5
lines changed

wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/config/DatasetConfig.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,15 @@ public class DatasetConfig {
5050
@Value("${wedpr.storage.download.shardSize: 20971520}")
5151
int shardSize;
5252

53+
@Value("${wedpr.dataset.status.update.timer.period:3600}")
54+
int datasetStatusUpdateTimerPeriodSec;
55+
56+
@Value("${wedpr.dataset.status.update.interval.sec:3600}")
57+
int datasetStatusUpdateIntervalSec;
58+
59+
@Value("${wedpr.dataset.status.update.limit.count:100}")
60+
int datasetStatusUpdateLimitCount;
61+
5362
@Value("${wedpr.dataset.sql.validation.pattern: ^(SELECT.*?)(?<!\\G)(;|$)}")
5463
String sqlValidationPattern;
5564

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.webank.wedpr.components.dataset.config;
2+
3+
import com.webank.wedpr.components.dataset.service.DatasetStatusUpdater;
4+
import com.webank.wedpr.components.db.mapper.dataset.mapper.DatasetMapper;
5+
import org.slf4j.Logger;
6+
import org.slf4j.LoggerFactory;
7+
import org.springframework.beans.factory.annotation.Autowired;
8+
import org.springframework.context.annotation.Bean;
9+
import org.springframework.context.annotation.Configuration;
10+
11+
@Configuration
12+
public class DatasetStatusUpdaterConfig {
13+
14+
private static final Logger logger = LoggerFactory.getLogger(DatasetStatusUpdaterConfig.class);
15+
16+
@Autowired DatasetMapper datasetMapper;
17+
18+
@Autowired DatasetConfig datasetConfig;
19+
20+
@Bean(name = "datasetStatusUpdater")
21+
public DatasetStatusUpdater newDatasetStatusUpdaterConfig() {
22+
logger.info("## Create DatasetStatusUpdater");
23+
DatasetStatusUpdater datasetStatusUpdater =
24+
new DatasetStatusUpdater(datasetConfig, datasetMapper);
25+
datasetStatusUpdater.start();
26+
return datasetStatusUpdater;
27+
}
28+
}

wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/datasource/processor/CsvDataSourceProcessor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,10 @@ public void analyzeData() throws DatasetException {
139139

140140
int columnNum = fieldList.size();
141141
int rowNum = FileUtils.getFileLinesNumber(cvsFilePath);
142+
if (rowNum > 1) {
143+
// Note: minus field header
144+
rowNum -= 1;
145+
}
142146

143147
this.dataSourceProcessorContext.getDataset().setDatasetFields(fieldString);
144148
this.dataSourceProcessorContext.getDataset().setDatasetColumnCount(columnNum);

wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/datasource/processor/DBDataSourceProcessor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,10 @@ public void analyzeData() throws DatasetException {
157157

158158
int columnNum = fieldList.size();
159159
int rowNum = FileUtils.getFileLinesNumber(cvsFilePath);
160+
if (rowNum > 1) {
161+
// Note: minus field header
162+
rowNum -= 1;
163+
}
160164
String md5Hash = FileUtils.calculateFileHash(cvsFilePath, "MD5");
161165
long fileSize = FileUtils.getFileSize(cvsFilePath);
162166

wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/datasource/processor/HdfsDataSourceProcessor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,10 @@ public void analyzeData() throws DatasetException {
129129

130130
int columnNum = fieldList.size();
131131
int rowNum = FileUtils.getFileLinesNumber(cvsFilePath);
132+
if (rowNum > 1) {
133+
// Note: minus field header
134+
rowNum -= 1;
135+
}
132136
String md5Hash = FileUtils.calculateFileHash(cvsFilePath, "MD5");
133137
long fileSize = FileUtils.getFileSize(cvsFilePath);
134138

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package com.webank.wedpr.components.dataset.service;
2+
3+
import com.webank.wedpr.components.dataset.config.DatasetConfig;
4+
import com.webank.wedpr.components.db.mapper.dataset.mapper.DatasetMapper;
5+
import java.util.concurrent.ScheduledExecutorService;
6+
import java.util.concurrent.ScheduledThreadPoolExecutor;
7+
import java.util.concurrent.TimeUnit;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
11+
public class DatasetStatusUpdater {
12+
13+
private static final Logger logger = LoggerFactory.getLogger(DatasetStatusUpdater.class);
14+
15+
private final DatasetConfig datasetConfig;
16+
private final DatasetMapper datasetMapper;
17+
18+
private final ScheduledExecutorService datasetStatusUpdateTimer =
19+
new ScheduledThreadPoolExecutor(1);
20+
21+
public DatasetStatusUpdater(DatasetConfig datasetConfig, DatasetMapper datasetMapper) {
22+
this.datasetConfig = datasetConfig;
23+
this.datasetMapper = datasetMapper;
24+
}
25+
26+
public void start() {
27+
int datasetStatusUpdateTimerSec = datasetConfig.getDatasetStatusUpdateTimerPeriodSec();
28+
logger.info(
29+
"start dataset status update timer, datasetStatusUpdateTimerSec: {}",
30+
datasetStatusUpdateTimerSec);
31+
datasetStatusUpdateTimer.scheduleAtFixedRate(
32+
new Runnable() {
33+
@Override
34+
public void run() {
35+
try {
36+
checkAndUpdateDatasetStatus();
37+
} catch (Exception e) {
38+
logger.warn("e: ", e);
39+
}
40+
}
41+
},
42+
0,
43+
datasetConfig.getDatasetStatusUpdateTimerPeriodSec(),
44+
TimeUnit.SECONDS);
45+
46+
logger.info("start dataset status update timer success");
47+
}
48+
49+
public void checkAndUpdateDatasetStatus() {
50+
Integer updateIntervalSec = datasetConfig.getDatasetStatusUpdateIntervalSec();
51+
Integer updateLimitCount = datasetConfig.getDatasetStatusUpdateLimitCount();
52+
int i = datasetMapper.updateStatusByUpdateInterval(updateIntervalSec, updateLimitCount);
53+
if (i > 0) {
54+
logger.info(
55+
"update dataset status, updateIntervalSec: {}, updateLimitCount: {}, count: {}",
56+
updateIntervalSec,
57+
updateLimitCount,
58+
i);
59+
} else {
60+
logger.debug(
61+
"update dataset status, updateIntervalSec: {}, updateLimitCount: {}, count: {}",
62+
updateIntervalSec,
63+
updateLimitCount,
64+
i);
65+
}
66+
}
67+
}

wedpr-components/dataset/src/main/java/com/webank/wedpr/components/dataset/sqlutils/SQLUtils.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,16 @@ public static void isSingleSelectStatement(String sql, String sqlValidationPatte
6666
throw new DatasetException("only support single SQL statement, sql: " + sql);
6767
}
6868

69+
sqlValidationPattern = sqlValidationPattern.trim();
70+
if (sqlValidationPattern.isEmpty()) {
71+
return;
72+
}
73+
6974
// regular expression for matching a single SELECT statement.
70-
// Pattern pattern =
71-
// Pattern.compile(
72-
// "^(SELECT.*?)(?<!\\G)(;|$)", Pattern.CASE_INSENSITIVE |
73-
// Pattern.DOTALL);
75+
// Pattern pattern =
76+
// Pattern.compile(
77+
// "^(SELECT.*?)(?<!\\G)(;|$)", Pattern.CASE_INSENSITIVE |
78+
// Pattern.DOTALL);
7479
Pattern pattern =
7580
Pattern.compile(sqlValidationPattern, Pattern.CASE_INSENSITIVE | Pattern.DOTALL);
7681
Matcher matcher = pattern.matcher(sql);
@@ -81,7 +86,11 @@ public static void isSingleSelectStatement(String sql, String sqlValidationPatte
8186
"only support single select SQL statement, sqlValidationPattern: {}, sql: {}",
8287
sqlValidationPattern,
8388
sql);
84-
throw new DatasetException("only support single select SQL statement, sql: " + sql);
89+
throw new DatasetException(
90+
"only support single select SQL statement, validationPattern: "
91+
+ sqlValidationPattern
92+
+ " sql: "
93+
+ sql);
8594
}
8695
}
8796
}

wedpr-components/db-mapper/dataset/src/main/java/com/webank/wedpr/components/db/mapper/dataset/mapper/DatasetMapper.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ List<Dataset> queryVisibleDatasetsForUser(
4646
@Param("endTime") String endTime,
4747
@Param("status") Integer status);
4848

49+
/** update dataset by update interval sec */
50+
int updateStatusByUpdateInterval(
51+
@Param("updateIntervalSec") Integer updateIntervalSec,
52+
@Param("updateLimitCount") Integer updateLimitCount);
53+
4954
/**
5055
* insert dataset
5156
*

wedpr-components/db-mapper/dataset/src/main/resources/mapper/DatasetMapper.xml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,17 @@
260260
dataset_id = #{datasetId}
261261
</update>
262262

263+
<update id="updateStatusByUpdateInterval">
264+
UPDATE
265+
wedpr_dataset
266+
SET
267+
status = -1
268+
WHERE
269+
status != -1 AND status != 0 AND TIMESTAMPDIFF(SECOND, update_at, NOW()) > #{updateIntervalSec}
270+
ORDER BY create_at DESC
271+
LIMIT #{updateLimitCount}
272+
</update>
273+
263274
<delete id="deleteDataset" parameterType="String">
264275
DELETE FROM wedpr_dataset WHERE dataset_id = #{datasetId}
265276
</delete>

0 commit comments

Comments
 (0)