Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ public class DatasetConfig {
@Value("${wedpr.storage.download.shardSize: 20971520}")
int shardSize;

@Value("${wedpr.dataset.status.update.timer.period:3600}")
int datasetStatusUpdateTimerPeriodSec;

@Value("${wedpr.dataset.status.update.interval.sec:3600}")
int datasetStatusUpdateIntervalSec;

@Value("${wedpr.dataset.status.update.limit.count:100}")
int datasetStatusUpdateLimitCount;

@Value("${wedpr.dataset.sql.validation.pattern: ^(SELECT.*?)(?<!\\G)(;|$)}")
String sqlValidationPattern;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.webank.wedpr.components.dataset.config;

import com.webank.wedpr.components.dataset.service.DatasetStatusUpdater;
import com.webank.wedpr.components.db.mapper.dataset.mapper.DatasetMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DatasetStatusUpdaterConfig {

private static final Logger logger = LoggerFactory.getLogger(DatasetStatusUpdaterConfig.class);

@Autowired DatasetMapper datasetMapper;

@Autowired DatasetConfig datasetConfig;

@Bean(name = "datasetStatusUpdater")
public DatasetStatusUpdater newDatasetStatusUpdaterConfig() {
logger.info("## Create DatasetStatusUpdater");
DatasetStatusUpdater datasetStatusUpdater =
new DatasetStatusUpdater(datasetConfig, datasetMapper);
datasetStatusUpdater.start();
return datasetStatusUpdater;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ public void analyzeData() throws DatasetException {

int columnNum = fieldList.size();
int rowNum = FileUtils.getFileLinesNumber(cvsFilePath);
if (rowNum > 1) {
// Note: minus field header
rowNum -= 1;
}

this.dataSourceProcessorContext.getDataset().setDatasetFields(fieldString);
this.dataSourceProcessorContext.getDataset().setDatasetColumnCount(columnNum);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ public void analyzeData() throws DatasetException {

int columnNum = fieldList.size();
int rowNum = FileUtils.getFileLinesNumber(cvsFilePath);
if (rowNum > 1) {
// Note: minus field header
rowNum -= 1;
}
String md5Hash = FileUtils.calculateFileHash(cvsFilePath, "MD5");
long fileSize = FileUtils.getFileSize(cvsFilePath);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ public void analyzeData() throws DatasetException {

int columnNum = fieldList.size();
int rowNum = FileUtils.getFileLinesNumber(cvsFilePath);
if (rowNum > 1) {
// Note: minus field header
rowNum -= 1;
}
String md5Hash = FileUtils.calculateFileHash(cvsFilePath, "MD5");
long fileSize = FileUtils.getFileSize(cvsFilePath);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.webank.wedpr.components.dataset.service;

import com.webank.wedpr.components.dataset.config.DatasetConfig;
import com.webank.wedpr.components.db.mapper.dataset.mapper.DatasetMapper;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DatasetStatusUpdater {

private static final Logger logger = LoggerFactory.getLogger(DatasetStatusUpdater.class);

private final DatasetConfig datasetConfig;
private final DatasetMapper datasetMapper;

private final ScheduledExecutorService datasetStatusUpdateTimer =
new ScheduledThreadPoolExecutor(1);

public DatasetStatusUpdater(DatasetConfig datasetConfig, DatasetMapper datasetMapper) {
this.datasetConfig = datasetConfig;
this.datasetMapper = datasetMapper;
}

public void start() {
int datasetStatusUpdateTimerSec = datasetConfig.getDatasetStatusUpdateTimerPeriodSec();
logger.info(
"start dataset status update timer, datasetStatusUpdateTimerSec: {}",
datasetStatusUpdateTimerSec);
datasetStatusUpdateTimer.scheduleAtFixedRate(
new Runnable() {
@Override
public void run() {
try {
checkAndUpdateDatasetStatus();
} catch (Exception e) {
logger.warn("e: ", e);
}
}
},
0,
datasetConfig.getDatasetStatusUpdateTimerPeriodSec(),
TimeUnit.SECONDS);

logger.info("start dataset status update timer success");
}

public void checkAndUpdateDatasetStatus() {
Integer updateIntervalSec = datasetConfig.getDatasetStatusUpdateIntervalSec();
Integer updateLimitCount = datasetConfig.getDatasetStatusUpdateLimitCount();
int i = datasetMapper.updateStatusByUpdateInterval(updateIntervalSec, updateLimitCount);
if (i > 0) {
logger.info(
"update dataset status, updateIntervalSec: {}, updateLimitCount: {}, count: {}",
updateIntervalSec,
updateLimitCount,
i);
} else {
logger.debug(
"update dataset status, updateIntervalSec: {}, updateLimitCount: {}, count: {}",
updateIntervalSec,
updateLimitCount,
i);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,16 @@ public static void isSingleSelectStatement(String sql, String sqlValidationPatte
throw new DatasetException("only support single SQL statement, sql: " + sql);
}

sqlValidationPattern = sqlValidationPattern.trim();
if (sqlValidationPattern.isEmpty()) {
return;
}

// regular expression for matching a single SELECT statement.
// Pattern pattern =
// Pattern.compile(
// "^(SELECT.*?)(?<!\\G)(;|$)", Pattern.CASE_INSENSITIVE |
// Pattern.DOTALL);
// Pattern pattern =
// Pattern.compile(
// "^(SELECT.*?)(?<!\\G)(;|$)", Pattern.CASE_INSENSITIVE |
// Pattern.DOTALL);
Pattern pattern =
Pattern.compile(sqlValidationPattern, Pattern.CASE_INSENSITIVE | Pattern.DOTALL);
Matcher matcher = pattern.matcher(sql);
Expand All @@ -81,7 +86,11 @@ public static void isSingleSelectStatement(String sql, String sqlValidationPatte
"only support single select SQL statement, sqlValidationPattern: {}, sql: {}",
sqlValidationPattern,
sql);
throw new DatasetException("only support single select SQL statement, sql: " + sql);
throw new DatasetException(
"only support single select SQL statement, validationPattern: "
+ sqlValidationPattern
+ " sql: "
+ sql);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ List<Dataset> queryVisibleDatasetsForUser(
@Param("endTime") String endTime,
@Param("status") Integer status);

/** update dataset by update interval sec */
int updateStatusByUpdateInterval(
@Param("updateIntervalSec") Integer updateIntervalSec,
@Param("updateLimitCount") Integer updateLimitCount);

/**
* insert dataset
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,17 @@
dataset_id = #{datasetId}
</update>

<update id="updateStatusByUpdateInterval">
UPDATE
wedpr_dataset
SET
status = -1
WHERE
status != -1 AND status != 0 AND TIMESTAMPDIFF(SECOND, update_at, NOW()) > #{updateIntervalSec}
ORDER BY create_at DESC
LIMIT #{updateLimitCount}
</update>

<delete id="deleteDataset" parameterType="String">
DELETE FROM wedpr_dataset WHERE dataset_id = #{datasetId}
</delete>
Expand Down
Loading