动态任务补偿处理
一、本章介绍
当 Spring Event 接收到事件消息后,MQ 发送或 HTTP 调用均存在失败风险。常见原因包括网络超时、服务不可用、线程阻塞、流量高峰等。
因此,在 MQ 与 HTTP 处理完成后,应立即更新数据库任务表的状态(成功或失败),随后由定时扫描任务执行补偿逻辑,从而保障最终一致性。
二、功能流程
如图,定时扫描任务流程图,以及在 http、mq 通知完成后,也要调用 dao 做变更状态操作;

首先,任务补偿机制本质是一个定时任务扫描任务表的过程。为提升整体扫描效率,我们设计了“门牌号”分片策略:支持配置多个补偿任务,每个任务仅负责扫描自身门牌号范围内的数据。此外,扫描时会先根据条件查询符合要求的最小 ID,随后通过 > id LIMIT x 分批获取待处理的数据列表。
其次,对前面的流程做进一步补充:HTTP 调用和 MQ 推送操作完成后,必须立即更新数据库任务表的记录状态(成功或失败)。需要特别说明的是,从 Spring Event 接收到事件消息开始,执行通知操作(HTTP/MQ)再到更新数据库,这些步骤均不在同一事务中。也就是说,从原始事务(业务数据 + 任务表写入)提交之后的所有环节,都存在失败可能。因此引入任务补偿时,就会面临重复执行的风险,例如 HTTP 被重复调用、MQ 被重复发送。为此,在与业务方对接时,必须要求对方接口实现幂等设计(如以 OrderId 作为唯一索引或业务幂等键)。
三、编码实现
1. 工程结构

- 以
TaskMessageEventJob为入口,根据LocalTaskMessageAutoProperties中的任务配置拆分任务,并按门牌号范围扫描数据。随后调用新增的ILocalTaskMessageDataService服务,完成数据扫描以获取待处理的任务数据,并发送Spring Event事件。事件发送后,继续执行之前实现的监听流程。 - 在
HTTPNotifyStrategy和RabbitMQNotifyStrategy两个处理流程中,补充更新数据库任务表的状态操作(无论成功或失败均需更新)。
2. 功能开发
2.1 数据操作(Dao)
public interface ITaskMessageDao {
/**
* 插入任务消息
* @param taskMessagePO 任务消息PO对象
* @return 影响行数
*/
int insert(TaskMessagePO taskMessagePO) throws SQLException;
/**
* 根据任务ID修改状态
* @param taskId 任务ID
* @param status 状态(0-待处理,1-处理中,2-已完成,3-失败)
* @return 影响行数
*/
int updateStatusByTaskId(String taskId, Integer status);
/**
* 根据门牌号查询任务消息列表
* @param houseNumbers 门牌号列表
* @param id 查询ID大于此值的记录
* @param limit 限制返回结果数量
* @return 任务消息列表
*/
List<TaskMessagePO> selectByHouseNumber(List<Integer> houseNumbers, Long id, Integer limit);
/**
* 根据门牌号查询符合条件的最小ID
* @param houseNumbers 门牌号列表
* @return 最小ID,如果没有找到则返回null
*/
Long selectMinIdByHouseNumber(List<Integer> houseNumbers);
}- 这里需要增加三个方法,updateStatusByTaskId、selectByHouseNumber、selectMinIdByHouseNumber,分别用于更新状态、查询符合门牌号的数据、获取符合条件的最小ID,从而进一步缩小扫描范围。
2.2 数据更新(notify)
在 HTTP 调用、MQ 推送,以后要根据结果做一个更库表记录的操作。
2.2.1 http 调用处理
@Slf4j
@Component("httpNotifyStrategy")
public class HTTPNotifyStrategy implements INotifyStrategy {
@Resource
private ILocalTaskMessagePort port;
@Resource
private ILocalTaskMessageRepository repository;
@Override
public String notify(TaskMessageEntityCommand command) throws Exception {
try {
String result = port.notify2http(command);
// 通知成功,更新状态为成功
repository.updateTaskStatusToSuccess(command.getTaskId());
return result;
} catch (Exception e) {
log.error("http notify error {}", JSON.toJSONString(command), e);
// 通知失败,更新状态为失败
repository.updateTaskStatusToFailed(command.getTaskId());
throw e;
}
}
}2.2.2 mq 推送处理
@Slf4j
@Component("rabbitMQNotifyStrategy")
public class RabbitMQNotifyStrategy implements INotifyStrategy {
@Resource
private ILocalTaskMessagePort port;
@Resource
private ILocalTaskMessageRepository repository;
@Override
public String notify(TaskMessageEntityCommand command) throws Exception {
try {
String result = port.notify2rabbitmq(command);
// 通知成功,更新状态为成功
repository.updateTaskStatusToSuccess(command.getTaskId());
return result;
} catch (Exception e) {
log.error("rabbitmq notify error {}", JSON.toJSONString(command), e);
// 通知失败,更新状态为失败
repository.updateTaskStatusToFailed(command.getTaskId());
throw e;
}
}
}3. 扫描任务
由于不同的任务可以扫描不同的门牌号,因此需要增加一个自定义属性配置,允许用户根据自身需求拆分出多个任务。
@Data
@ConfigurationProperties(prefix = "cactusli.wrench.task.config", ignoreInvalidFields = true)
public class LocalTaskMessageAutoProperties {
/**
* 任务组配置列表
*/
private List<TaskGroupConfig> groups = new ArrayList<>();
@Data
public static class TaskGroupConfig {
/**
* 任务组ID或名称,用于日志区分
*/
private String groupId = "default";
/**
* 扫描的门牌号列表,例如 [1,2,3]
*/
private List<Integer> houseNumbers = new ArrayList<>();
/**
* 调度 cron 表达式,例如:"0/10 * * * * ?" 表示每10秒
*/
private String cron;
/**
* 固定延迟毫秒;若配置此项则使用固定延迟调度
*/
private Long fixedDelayMs;
/**
* 每次批量处理限制条数
*/
private Integer limit = 100;
}
}application.yaml 配置案例:
cactusli:
wrench:
task:
config:
groups:
# 使用 cron 表达式调度(每 10 秒执行一次)
- groupId: group01
houseNumbers: [1, 2, 3]
cron: "0/10 * * * * ?"
limit: 100
# 使用固定延迟调度(每 5 秒执行一次)
- groupId: group02
houseNumbers: [4, 5]
fixedDelayMs: 5000
limit: 50- 我们可以采用这种方式配置一个执行任务组,其中每个任务都可以指定扫描的门牌号。
@Slf4j
@Component
public class TaskMessageEventJob {
/**
* 记录每个任务组最近一次处理的最大ID
*/
private final Map<String, AtomicLong> groupLastIdMap = new ConcurrentHashMap<>();
private final LocalTaskMessageAutoProperties properties;
private final ThreadPoolTaskScheduler scheduler;
private final ILocalTaskMessageNotifyService notifyService;
private final ILocalTaskMessageDataService dataService;
public TaskMessageEventJob(LocalTaskMessageAutoProperties properties, ThreadPoolTaskScheduler scheduler, ILocalTaskMessageNotifyService notifyService, ILocalTaskMessageDataService dataService) {
this.properties = properties;
this.scheduler = scheduler;
this.notifyService = notifyService;
this.dataService = dataService;
}
@PostConstruct
public void init() {
List<LocalTaskMessageAutoProperties.TaskGroupConfig> groups = properties.getGroups();
if (groups == null || groups.isEmpty()) {
log.info("TaskMessageEventJob 未配置任务组,跳过调度初始化");
return;
}
for (LocalTaskMessageAutoProperties.TaskGroupConfig group : groups) {
scheduleGroup(group);
}
}
private void scheduleGroup(LocalTaskMessageAutoProperties.TaskGroupConfig group) {
String groupId = group.getGroupId();
List<Integer> houseNumbers = group.getHouseNumbers();
if (houseNumbers == null || houseNumbers.isEmpty()) {
log.warn("任务组 [{}] 未配置 houseNumbers,跳过该组调度", groupId);
return;
}
// 初始化 lastId
groupLastIdMap.computeIfAbsent(groupId, k -> {
Long minId = dataService.selectMinIdByHouseNumber(houseNumbers);
long startId = (minId == null ? 0L : minId);
log.info("任务组 [{}] 初始化起始ID为 {},houseNumbers={}", groupId, startId, houseNumbers);
return new AtomicLong(startId);
});
Runnable task = () -> {
try {
long lastId = groupLastIdMap.get(groupId).get();
List<TaskMessageEntityCommand> cmdList = dataService.selectByHouseNumber(houseNumbers, lastId, group.getLimit());
if (cmdList == null || cmdList.isEmpty()) {
return;
}
// 发布事件
for (TaskMessageEntityCommand cmd : cmdList) {
notifyService.notify(cmd);
}
// 更新 lastId
long maxId = cmdList.stream().map(TaskMessageEntityCommand::getId).max(Comparator.naturalOrder()).orElse(lastId);
groupLastIdMap.get(groupId).set(maxId);
log.info("任务组 [{}] 处理完成:拉取{}条,lastId: {} -> {}", groupId, cmdList.size(), lastId, maxId);
} catch (Exception e) {
log.error("任务组 [{}] 执行异常: {}", groupId, e.getMessage(), e);
}
};
if (group.getCron() != null && !group.getCron().trim().isEmpty()) {
scheduler.schedule(task, new CronTrigger(group.getCron()));
log.info("任务组 [{}] 已按 cron [{}] 调度", groupId, group.getCron());
} else {
long delay = group.getFixedDelayMs() != null ? group.getFixedDelayMs() : 5000L;
scheduler.scheduleWithFixedDelay(task, delay);
log.info("任务组 [{}] 已按 fixedDelayMs [{}] 调度", groupId, delay);
}
}
}- 类职责:
TaskMessageEventJob是一个定时拉取并发布任务消息的调度器。它根据配置的任务组,定期查询待处理消息,发布事件,并记录每个任务组已处理到的最大 ID,用于增量拉取,实现动态补偿。
核心功能:
- groupLastIdMap:用于保存“任务组 → 最近处理的最大 ID”的映射,支持增量拉取,避免重复处理。其类型为 AtomicLong,确保在并发场景下的读写原子性。
- properties:用于读取外部配置(包括任务组、门牌号、调度方式等)。
- scheduler:Spring 的线程池调度器,负责按照 cron 或固定延迟方式执行任务。
- notifyService:用于对外发布事件(通知消费方处理每条消息)。
- dataService:数据查询服务,从存储中按条件拉取消息记录。
初始化流程:
- 读取配置中的 groups;如果未配置,则打印日志并跳过初始化。
- 对每个任务组调用 scheduleGroup(group) 进行独立的调度注册。
注册任务组调度:
- 参数校验:必须配置 houseNumbers(门牌号集合),否则跳过。
- 初始化增量起点 lastId:
- 使用 dataService.selectMinIdByHouseNumber(houseNumbers) 找到该组关注的门牌号中的最小 ID。
- 若查询为空,则从 0 开始;将起始值保存到 groupLastIdMap,供后续增量拉取使用。
- 构造执行体 task:每次调度触发时,按以下步骤处理:
- 读取当前组的 lastId。
- 通过 dataService.selectByHouseNumber(houseNumbers, lastId, group.getLimit()) 拉取“ID 大于 lastId”的消息,最多 limit 条。
- 如果本次没有新消息,则直接返回。
- 遍历消息并发布事件:notifyService.notify(cmd)。
- 计算本批次的最大 ID 并更新 groupLastIdMap,作为下次拉取的起点。
- 打印处理日志,并进行异常捕获与记录。
- 调度方式选择:
- 如果配置了 cron 表达式,则按 CronTrigger 定时执行。
- 否则按固定延迟 fixedDelayMs 执行;未配置时默认 5000ms。
增量与补偿的意义:
- 通过记录 lastId,每次只拉取“新产生”的消息,实现增量处理,避免重复。
- 如果某次运行异常,中断前更新的 lastId 仍在内存中;下次调度会从该值继续,尽量减少漏处理。但为更强的容灾,实际工程中常将 lastId 持久化(例如数据库/Redis),并配合消费方的幂等设计,确保“最多一次”或“至少一次”语义。
配置示例说明:
- group01 使用 cron(每 10 秒执行),拉取门牌号 [1,2,3],每次最多 limit=100 条。
- group02 使用固定延迟(每 5 秒执行),拉取门牌号 [4,5],每次最多 limit=50 条。
四、测试验证
注意,测试时要对 local-task-message 进行重新打包。
26-03-02.17:57:46.753 [main ] INFO Application - Starting Application using Java 17.0.6 with PID 20980 (D:\Personal_projects\local-task-message\local-task-message-test\target\classes started by Dell in D:\Personal_projects\local-task-message\local-task-message-test)
26-03-02.17:57:46.753 [main ] INFO Application - The following 1 profile is active: "dev"
26-03-02.17:57:47.343 [main ] INFO TomcatWebServer - Tomcat initialized with port 8091 (http)
26-03-02.17:57:47.350 [main ] INFO Http11NioProtocol - Initializing ProtocolHandler ["http-nio-8091"]
26-03-02.17:57:47.351 [main ] INFO StandardService - Starting service [Tomcat]
26-03-02.17:57:47.351 [main ] INFO StandardEngine - Starting Servlet engine: [Apache Tomcat/10.1.50]
26-03-02.17:57:47.381 [main ] INFO [/] - Initializing Spring embedded WebApplicationContext
26-03-02.17:57:47.382 [main ] INFO ServletWebServerApplicationContext - Root WebApplicationContext: initialization completed in 606 ms
26-03-02.17:57:47.731 [main ] INFO HikariDataSource - Retail_HikariCP - Starting...
26-03-02.17:57:47.894 [main ] INFO HikariPool - Retail_HikariCP - Added connection com.mysql.cj.jdbc.ConnectionImpl@760a2b6e
26-03-02.17:57:47.895 [main ] INFO HikariDataSource - Retail_HikariCP - Start completed.
26-03-02.17:57:47.936 [main ] INFO TaskMessageEventJob - 任务组 [group01] 初始化起始ID为 51,houseNumbers=[0, 1, 2, 3]
26-03-02.17:57:47.940 [main ] INFO TaskMessageEventJob - 任务组 [group01] 已按 cron [0/10 * * * * ?] 调度
26-03-02.17:57:47.945 [main ] INFO TaskMessageEventJob - 任务组 [group02] 初始化起始ID为 47,houseNumbers=[4, 5, 6, 7, 8, 9]
26-03-02.17:57:47.946 [main ] INFO TaskMessageEventJob - 任务组 [group02] 已按 fixedDelayMs [5000] 调度
26-03-02.17:57:50.031 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO TestTopicListener - 接收消息(mq):{"model": "gpt-4o", "messages": [{"role": "user", "content": "1+1"}]}
26-03-02.17:57:50.043 [TaskMessageScheduler-2] INFO LocalTaskMessageRepository - 更新任务状态为成功,taskId: TASK_NEW_2026020601
26-03-02.17:57:50.077 [http-nio-8091-exec-1] INFO [/] - Initializing Spring DispatcherServlet 'dispatcherServlet'
26-03-02.17:57:50.077 [http-nio-8091-exec-1] INFO DispatcherServlet - Initializing Servlet 'dispatcherServlet'
26-03-02.17:57:50.078 [http-nio-8091-exec-1] INFO DispatcherServlet - Completed initialization in 0 ms
26-03-02.17:57:50.129 [http-nio-8091-exec-1] INFO ChatCompletionsController - [ChatCompletions] auth=null, model=gpt-4o, messages=[ChatMessage(role=user, content=1+1)]
26-03-02.17:57:50.129 [http-nio-8091-exec-1] INFO ChatCompletionsController - [ChatCompletions] response id=17472896-2448-45a1-b023-28277bd53022, model=gpt-4o, messages_size=1
26-03-02.17:57:50.168 [TaskMessageScheduler-2] INFO LocalTaskMessageRepository - 更新任务状态为成功,taskId: TASK_NEW_25111502
26-03-02.17:57:50.172 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO TestTopicListener - 接收消息(mq):{"model": "gpt-4o", "messages": [{"role": "user", "content": "1+1"}]}
26-03-02.17:57:50.187 [TaskMessageScheduler-2] INFO LocalTaskMessageRepository - 更新任务状态为成功,taskId: TASK_NEW_2026020602
26-03-02.17:57:50.188 [TaskMessageScheduler-2] INFO TaskMessageEventJob - 任务组 [group01] 处理完成:拉取3条,lastId: 51 -> 54
26-03-02.17:57:53.105 [http-nio-8091-exec-3] INFO ChatCompletionsController - [ChatCompletions] auth=null, model=gpt-4o, messages=[ChatMessage(role=user, content=1+1)]
26-03-02.17:57:53.105 [http-nio-8091-exec-3] INFO ChatCompletionsController - [ChatCompletions] response id=140694cb-7d50-4028-a4a3-c12b17dc7118, model=gpt-4o, messages_size=1
26-03-02.17:57:53.113 [TaskMessageScheduler-1] INFO LocalTaskMessageRepository - 更新任务状态为成功,taskId: TASK_NEW_06
26-03-02.17:57:53.114 [http-nio-8091-exec-2] INFO ChatCompletionsController - [ChatCompletions] auth=null, model=gpt-4o, messages=[ChatMessage(role=user, content=1+1)]
26-03-02.17:57:53.114 [http-nio-8091-exec-2] INFO ChatCompletionsController - [ChatCompletions] response id=95a8a5d2-7ade-4dab-b8e4-9fa5df925394, model=gpt-4o, messages_size=1
26-03-02.17:57:53.120 [TaskMessageScheduler-1] INFO LocalTaskMessageRepository - 更新任务状态为成功,taskId: TASK_NEW_25111501
26-03-02.17:57:53.120 [TaskMessageScheduler-1] INFO TaskMessageEventJob - 任务组 [group02] 处理完成:拉取2条,lastId: 47 -> 50

- 启动项目后,任务扫描到未处理的数据,之后执行发送操作。
- 观察日志,你就可以看到之前状态为0的,已经可以做了变更操作。