任务表设计和数据写入
一、本章介绍
设计通用的本地消息任务表及其数据插入处理。该表由引入技术组件的上游系统自行在数据库中创建配置。随后,在调用组件时,即可使用同一数据源对该库表进行统一操作。
二、功能流程
如图,是本节关于写库操作的功能流程。

首先,为了在组件中实现数据库表的操作,需要引入 DataSource 数据源,从而确保在处理数据库操作时,能够使用同一数据库连接进行统一管理。
其次,我们将直接采用 JDBC 的方式来操作数据库,而不额外引入 MyBatis 框架。这样做的目的是在上游系统集成组件时,避免潜在的版本冲突问题。采用最原始的方法,其兼容性也最为出色。
此外,本节还需要独立设计任务消息表。
三、库表设计

通用的本地消息表,需要为每次任务生成唯一的任务ID,例如订单号、券号或拼接的UUID等,只要在业务流程中确保其唯一性即可。本设计未添加额外的业务分类描述,若公司在实际应用中涉及更复杂的分类或统计需求,可自行扩展相关字段。
接下来是任务名称和通知类型(例如MQ或HTTP),通知可通过不同方式处理外部请求。随后是具体通知类型的配置参数,由于MQ和HTTP调用各有特定参数,此部分以TEXT格式存储;在MySQL 8.x版本中,可进一步采用JSON格式进行存储,以提升灵活性。
其后是任务的执行状态,以及任务执行时的入参信息,即用于通知外部系统的具体内容,例如HTTP请求的参数或MQ发送的消息主体。
此外,还需设计一个“门牌号”字段。如果任务表缺少此区分机制,即使配置多个Job任务,扫描时仍会处理相同的数据。因此,通过添加门牌号进行分区,例如配置Job A扫描ID为1、2、3的记录,Job B扫描ID为4、5、6的记录,从而有效提升整体吞吐量。
四、编码实现
1. 工程结构

首先,在工程的 infrastructure 层中添加 DAO 操作。该部分与日常使用的 DAO 并无差异,仅其实现(impl)采用 JDBC,而日常工程中通常使用 MyBatis。
随后,在 domain 领域层定义操作数据库的 adapter 适配器下的仓储接口 ILocalTaskMessageRepository,然后在 infrastructure 层实现该接口,以完成数据库操作。
最后,在 LocalTaskMessageHandleService 的消息发布处理中,首先执行数据库表数据的插入操作。
2. 功能开发
2.1 对象扩展
@Data
public class TaskMessageEntityCommand {
/**
* 任务ID
*/
private String taskId;
/**
* 任务名称
*/
private String taskName;
/**
* 任务类型
*/
private String notifyType;
/**
* 通知配置
*/
private NotifyConfig notifyConfig;
/**
* 任务状态
*/
private Integer status;
/**
* 参数配置
*/
private String parameterJson;
public TaskMessageEntityCommand() {
}
public TaskMessageEntityCommand(String taskId, String taskName, TaskNotifyEnum taskNotifyEnum, NotifyConfig notifyConfig, String parameterJson) {
this.taskId = taskId;
this.taskName = taskName;
this.notifyType = taskNotifyEnum.getType();
this.notifyConfig = notifyConfig;
this.status = 0;
this.parameterJson = parameterJson;
}
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public static class NotifyConfig {
// mq 配置
private MQ mq;
// http 配置
private HTTP http;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public static class MQ {
private String topic;
private String exchange;
}
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public static class HTTP {
private String url;
private String method;
private String contentType;
private String authorization;
}
}
}- 为了满足新数据库表的操作需求,需要对 TaskMessageEntityCommand 对象进行字段扩展。这些扩展字段基本与数据库所需的配置保持一致,仅额外引入了 NotifyConfig 下的 MQ 和 HTTP 对象。这样设计的目的,是让使用方能够在一个类中清晰地了解所有可用对象。该部分也可独立拆分为值对象形式,其中 MQ 和 HTTP 配置即作为值对象;然而,在用户实际使用时,从单一包中引入的体验,可能不如直接从一个类中获取更直观明了。
2.2 dao 操作
@Slf4j
@Component
public class TaskMessageDaoImpl implements ITaskMessageDao {
private final DataSource dataSource;
public TaskMessageDaoImpl(DataSource dataSource) {
this.dataSource = dataSource;
}
@Override
public int insert(TaskMessagePO taskMessagePO) throws SQLException {
String sql = "INSERT INTO local_task_message (task_id, task_name, notify_type, notify_config, status, parameter_json, house_number ,create_time, update_time) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
try (Connection connection = dataSource.getConnection();
PreparedStatement ps = connection.prepareStatement(sql)) {
ps.setString(1, taskMessagePO.getTaskId());
ps.setString(2, taskMessagePO.getTaskName());
ps.setString(3, taskMessagePO.getNotifyType());
ps.setString(4, taskMessagePO.getNotifyConfig());
ps.setInt(5, taskMessagePO.getStatus());
ps.setString(6, taskMessagePO.getParameterJson());
ps.setInt(7, taskMessagePO.getHouseNumber());
ps.setObject(8, taskMessagePO.getCreateTime());
ps.setObject(9, taskMessagePO.getUpdateTime());
return ps.executeUpdate();
} catch (SQLException e) {
log.error("插入任务消息失败,taskId: {}", taskMessagePO.getTaskId(), e);
throw e;
}
}
}- 注入 DataSource 数据源,通过 jdbc 最原始方式,操作数据库插入的处理。
2.3 适配接口
public interface ILocalTaskMessageRepository {
/**
* 保存任务消息
* @param command 任务消息实体命令
*/
void saveTaskMessage(TaskMessageEntityCommand command) throws Exception;
}
@Slf4j
@Repository
public class LocalTaskMessageRepository implements ILocalTaskMessageRepository {
private final ITaskMessageDao taskMessageDao;
public LocalTaskMessageRepository(ITaskMessageDao taskMessageDao) {
this.taskMessageDao = taskMessageDao;
}
@Override
public void saveTaskMessage(TaskMessageEntityCommand command) throws Exception {
TaskMessagePO po = new TaskMessagePO();
po.setTaskId(command.getTaskId());
po.setTaskName(command.getTaskName());
po.setNotifyType(command.getNotifyType());
po.setStatus(command.getStatus());
po.setParameterJson(command.getParameterJson());
// 将NotifyConfig对象转换为JSON字符串
if (command.getNotifyConfig() != null) {
po.setNotifyConfig(JSON.toJSONString(command.getNotifyConfig()));
}
// 根据任务ID计算哈希值,取正数,获取最后一位数字作为门牌号
int hashCode = Math.abs(command.getTaskId().hashCode());
int houseNumber = hashCode % 10;
po.setHouseNumber(houseNumber);
po.setCreateTime(LocalDateTime.now());
po.setUpdateTime(LocalDateTime.now());
try {
int result = taskMessageDao.insert(po);
if (1 != result) {
throw new RuntimeException("result is not 1 taskId:{}" + command.getTaskId());
}
} catch (Exception e) {
log.error("保存任务消息失败,taskId: {} {}", command.getTaskId(), JSON.toJSONString(command), e);
throw e;
}
}
}这一部分需要在 domain 层定义 ILocalTaskMessageRepository 接口,随后在基础设施层实现该接口,即 LocalTaskMessageRepository。
其实现逻辑是将 TaskMessageEntityCommand 领域对象转换为数据库的 PO 对象,并调用 DAO 完成数据插入操作。
2.4 服务调用
@Slf4j
@Service
public class LocalTaskMessageHandleService implements ILocalTaskMessageHandleService {
@Resource
private ILocalTaskMessageEvent event;
@Resource
private ILocalTaskMessageRepository repository;
@Override
public void acceptTaskMessage(TaskMessageEntityCommand command) {
try {
log.info("受理任务消息: {}", command);
// 1. 保存任务消息
repository.saveTaskMessage(command);
// 2. 发布事件消息
event.publishEvent(command);
} catch (Exception e) {
log.error("受理任务消息执行失败 {}", JSON.toJSONString(command), e);
}
}
}- 之后就要扩展服务的调用了,在受理任务消息的时候,调用数据库操作,保存消息,之后才是发送事件消息。
五、功能测试
1. 构建组件
先 clean 以及 install 构建组件,让测试工程可以引入到最新的组件代码。

2. 导入库表

- 如图所示,导入库表。
3. 单测案例
3.1单测代码
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApiTest {
@Resource
private ILocalTaskMessageHandleService handleService;
@Resource
private IGroupBuyOrderListDao groupBuyOrderListDao;
@Test
public void test() throws InterruptedException {
// 事务测试
insertOrderTransactional();
new CountDownLatch(1).await();
}
@Transactional
public void insertOrderTransactional() throws InterruptedException {
// 构造插入数据
GroupBuyOrderList order = GroupBuyOrderList.builder()
.userId("xfg704")
.teamId("69268465")
.orderId("537625111201")
.activityId(100123L)
.startTime(new Date())
.endTime(new Date(System.currentTimeMillis() + 3600_000))
.goodsId("9890001")
.source("s01")
.channel("c01")
.originalPrice(new BigDecimal("99.00"))
.deductionPrice(new BigDecimal("10.00"))
.payPrice(new BigDecimal("89.00"))
.status(0)
.outTradeNo("406025111201")
.bizId("100123_25111201")
.build();
// 执行插入
groupBuyOrderListDao.insert(order);
TaskMessageEntityCommand taskMessageEntityCommand = new TaskMessageEntityCommand(
"TASK_NEW_01",
"gpt调用测试任务",
TaskNotifyEnum.HTTP,
TaskMessageEntityCommand.NotifyConfig.builder()
.http(TaskMessageEntityCommand.NotifyConfig.HTTP.builder()
.url("http://127.0.0.1:8091/v1/chat/completions")
.method("post")
.contentType("application/json")
.build())
.build(),
"{\"model\": \"gpt-4o\", \"messages\": [{\"role\": \"user\", \"content\": \"1+1\"}]}"
);
handleService.acceptTaskMessage(taskMessageEntityCommand);
new CountDownLatch(1).await();
}
}
- 这部分会在同一个方法的事务注解下,完成业务的数据表插入,之后执行任务操作。
- 执行后可以观察操作日志,也可以如上一节打断点调试。之后你就可以在数据库表看到数据了