一个规则驱动的数据处理与发布系统,通过 YAML 描述任务,运行期按规则动态装配算子,并以虚拟线程(或回退普通线程)并发处理批数据。
/pipeline/model
: 流水线通用接口Pipeline/Collector/Processor/Publisher/Stage
、MapProcessor
/pipeline
: 流水线实现DTSPipeline
: 单条数据串联执行(I→T→O)DTSPipelineBatch
: 批数据并发执行(I→List→List)
pipeline/impl
: 工厂与执行器OperatorFactory
: 按 type+config 创建算子DTSExecutor
: 虚拟线程执行器(回退功能暂时有问题,不影响使用)
pipeline/impl/*
: 各层算子(collect/clean/calculate/publish)pipeline/task
: 任务加载与注册YamlTaskLoader/TaskAutoLoader/TaskRegistry/TaskDefinition/FlowDefinition/OperatorStep
src/main/java/com/ml/datatransforemer/dts/service
: 服务层DTSService/DTSServiceImpl
: 解析任务 → 调OperatorFactory
→ 组装 Pipeline → 用DTSExecutor
执行
src/main/resources/task.yaml
: 任务配置示例
在 application.yml
配置任务文件路径和数据库:
dts:
taskPath: ${your path}
- 请求示例(DTSRequest):
{
"ruleId": "${your-task-name}",
"payload": "{\"\“}"
}
- 启动:
mvn spring-boot:run
- 接口:
POST /api/dts/execute
(Body 为 DTSRequest)
- 流控与重试:对单条处理失败的策略(continue/stop/retry),配合监控发布。
- 热更新:新增
ReloadController
提供/api/dts/tasks/reload
触发YamlTaskLoader.load()
。 - 类型通道统一:约定上下文键(如
rows/payload/ruleId
),便于算子通信。
- 这个项目现网原本是go语言,这里为了开源使用AI工具辅助重写,如有发现Bug可以提ISSUE提醒我完善
- 项目核心设计大致如readme所示,有业务需要可以在未来进行自行拓展