切面拦截任务操作
一、本章介绍
因为我们做的是一个通用组件项目,对于它的使用应该要提供出更简单轻量的方式,让使用方可以轻量化的接入。所以对于 handleService.acceptTaskMessage(taskMessageEntityCommand) 编码的方式,可以提供更为优化的处理手段。
这里我们选择增加一个本地任务消息的自定义注解,对于配置了此注解的方法进行拦截,并获取入参信息的 taskMessageEntityCommand 对象。之后,开始进行同一个事务或开启新的事物的方式,完成数据的插入操作,并进行 Spring Event 消息推送。这样可以让用户的使用更加简洁。
二、功能流程
如图,通过自定义注解加切面拦截方式,完成本地消息的受理。

首先,添加一个自定义注解,并在 Config 配置类中实现 AOP 切面逻辑。该切面负责拦截标注注解的方法,自动捕获入参并提取任务消息对象。
接着,检查当前事务状态:无事务时新建事务,已存在事务时直接加入。事务内完成数据库表插入后,立即发布 Spring Event 事件。后续流程保持完全一致。
三、编码实现
1. 工程结构

注意,本节涉及 AOP 切面实现,因此需在 pom.xml 中引入 spring-boot-starter-aop 依赖包。
LocalTaskMessage 作为切面专用注解,统一放置于 message 包下,便于一眼识别入口,使用更加直观便捷。
config 目录下的 aop 包用于拦截注解并执行一系列操作,后续需在 AutoConfig 中新增该包的扫描配置,将切面组件加载至 Spring 容器。
2. 功能开发
2.1 自定义注解
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
@Documented
public @interface LocalTaskMessage {
/**
* 实体属性名称
*/
String entityAttributeName() default "";
}- 创建一个仅作用于方法上的自定义注解,并在注解中新增 entityAttributeName 属性(用于指定实体属性名称)。这样,拦截到该注解的方法后,即可根据属性名称精准获取对应的入参对象。
2.2 拦截操作
/**
* @author 仙人球⁶ᴳ |
* @date 2026/3/3 16:51
* @github https://github.com/lixuanfengs
*/
@Slf4j
@Component
@Aspect
public class LocalTaskMessageAop {
private final ILocalTaskMessageHandleService handleService;
private final TransactionTemplate transactionTemplate;
public LocalTaskMessageAop(ILocalTaskMessageHandleService handleService,
TransactionTemplate transactionTemplate) {
this.handleService = handleService;
this.transactionTemplate = transactionTemplate;
}
@Pointcut("@annotation(cn.cactusli.wrench.local.task.message.LocalTaskMessage)")
public void aopPoint() {
}
@Around("aopPoint() && @annotation(localTaskMessage)")
public Object notify(ProceedingJoinPoint joinPoint, LocalTaskMessage localTaskMessage) throws Throwable {
String signature = joinPoint.getSignature().toShortString();
String entityAttributeName = localTaskMessage.entityAttributeName();
// 判断当前是否已有事务;有则共用,无则统一开启并包裹目标方法与切面逻辑
boolean active = TransactionSynchronizationManager.isActualTransactionActive();
if (active) {
try {
Object result = joinPoint.proceed();
TaskMessageEntityCommand command = resolveCommand(joinPoint, entityAttributeName);
if (command != null) {
log.info("LocalTaskMessageAop 提取到命令对象(同一事务): 方法={} 路径={} 命令={}", signature, entityAttributeName, command);
handleService.acceptTaskMessage(command);
} else {
log.warn("LocalTaskMessageAop 未能提取命令对象(同一事务): 方法={} 路径={}", signature, entityAttributeName);
}
return result;
} catch (Throwable e) {
log.error("LocalTaskMessageAop 处理失败: 方法={} 路径={} 错误={}", signature, entityAttributeName, e.getMessage(), e);
throw e;
}
}
// 无事务时,开启新事务并将方法执行与切面处理统一包裹
try {
return transactionTemplate.execute(status -> {
try {
Object result = joinPoint.proceed();
TaskMessageEntityCommand command = resolveCommand(joinPoint, entityAttributeName);
if (command != null) {
log.info("LocalTaskMessageAop 提取到命令对象(新开事务): 方法={} 路径={} 命令={}", signature, entityAttributeName, command);
handleService.acceptTaskMessage(command);
} else {
log.warn("LocalTaskMessageAop 未能提取命令对象(新开事务): 方法={} 路径={}", signature, entityAttributeName);
}
return result;
} catch (Throwable t) {
log.error("LocalTaskMessageAop 事务内处理失败: 方法={} 路径={} 错误={}", signature, entityAttributeName, t.getMessage(), t);
status.setRollbackOnly();
throw new RuntimeException(t);
}
});
} catch (RuntimeException e) {
if (e.getCause() != null) throw e.getCause();
throw e;
}
}
/**
* 根据 entityAttributeName 从方法入参中解析出 TaskMessageEntityCommand
* 支持两种用法:
* 1) "command" 直接为方法参数名,且参数类型为 TaskMessageEntityCommand
* 2) "xxx.command" 首段为方法参数名,后续为对象属性路径,例如 xxx.getCommand()
* 当 entityAttributeName 为空时,回退为从参数列表中直接寻找第一个 TaskMessageEntityCommand 类型的对象。
*/
private TaskMessageEntityCommand resolveCommand(ProceedingJoinPoint joinPoint, String entityAttributeName) {
Object[] args = joinPoint.getArgs();
MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
Method method = methodSignature.getMethod();
// 如果未配置路径,直接从参数中找第一个 TaskMessageEntityCommand
if (entityAttributeName == null || entityAttributeName.trim().isEmpty()) {
for (Object arg : args) {
if (arg instanceof TaskMessageEntityCommand) {
return (TaskMessageEntityCommand) arg;
}
}
return null;
}
String[] path = entityAttributeName.split("\\.");
String paramName = path[0];
// 通过参数名定位入参对象;优先使用反射参数名,其次回退到类型匹配
Object root = findArgumentByNameOrType(method, args, paramName);
if (root == null) {
return null;
}
// 如果仅有首段且类型就是命令,直接返回
if (path.length == 1) {
return (root instanceof TaskMessageEntityCommand) ? (TaskMessageEntityCommand) root : null;
}
// 继续沿路径下钻对象属性
Object current = root;
for (int i = 1; i < path.length; i++) {
if (current == null) return null;
current = extractProperty(current, path[i]);
}
return (current instanceof TaskMessageEntityCommand) ? (TaskMessageEntityCommand) current : null;
}
/**
* 根据参数名或类型在方法入参中查找对应对象。
*/
private Object findArgumentByNameOrType(Method method, Object[] args, String paramName) {
// 先尝试使用反射拿到参数名(JDK8+ 需 -parameters 或带调试信息)
java.lang.reflect.Parameter[] parameters = method.getParameters();
if (parameters.length == args.length) {
for (int i = 0; i < parameters.length; i++) {
if (Objects.equals(parameters[i].getName(), paramName)) {
return args[i];
}
}
}
// 如果参数名无法匹配,尝试按类型回退(避免误匹配,仅在 paramName 看起来像类型时使用)
if ("command".equals(paramName)) {
for (Object arg : args) {
if (arg instanceof TaskMessageEntityCommand) {
return arg;
}
}
}
return null;
}
/**
* 从对象中按属性名提取属性值,优先调用 getter,然后尝试直接访问字段。
*/
private Object extractProperty(Object target, String propertyName) {
Class<?> clazz = target.getClass();
String getterName = "get" + capitalize(propertyName);
try {
Method getter = clazz.getMethod(getterName);
return getter.invoke(target);
} catch (Exception ignore) {
// ignore and fallback to field
}
try {
Field field = clazz.getDeclaredField(propertyName);
field.setAccessible(true);
return field.get(target);
} catch (Exception e) {
return null;
}
}
private String capitalize(String s) {
if (s == null || s.isEmpty()) return s;
char first = s.charAt(0);
if (Character.isUpperCase(first)) return s;
return Character.toUpperCase(first) + s.substring(1);
}
}aopPoint切点配置为拦截自定义注解后,所有标注该注解的方法均会被自动切入。notify(ProceedingJoinPoint joinPoint, LocalTaskMessage localTaskMessage)方法在切入后,即可完成事务状态判断、入参信息提取,并统一执行事务操作与消息通知。
2.3 扫描切面
@Configuration
@EnableAsync
@EnableScheduling
@EnableConfigurationProperties(value = {
LocalTaskMessageAutoProperties.class})
@ComponentScan(basePackages = {
"cn.cactusli.wrench.local.task.message.domain.*",
"cn.cactusli.wrench.local.task.message.infrastructure.*",
"cn.cactusli.wrench.local.task.message.trigger.*",
"cn.cactusli.wrench.local.task.message.config.aop.*"})
public class LocalTaskMessageAutoConfig {
@Bean
public OkHttpClient okHttpClient() {
ConnectionPool pool = new ConnectionPool(10, 5, TimeUnit.MINUTES);
return new OkHttpClient.Builder()
.connectionPool(pool)
.retryOnConnectionFailure(true)
.connectTimeout(100, TimeUnit.SECONDS)
.readTimeout(300, TimeUnit.SECONDS)
.writeTimeout(300, TimeUnit.SECONDS)
.build();
}
@Bean
public GenericHttpGateway genericHttpGateway(OkHttpClient okHttpClient) {
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("http://127.0.0.1/")
.addConverterFactory(GsonConverterFactory.create())
.client(okHttpClient)
.build();
return retrofit.create(GenericHttpGateway.class);
}
@Bean("taskMessageScheduler")
public ThreadPoolTaskScheduler taskMessageScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(2);
scheduler.setThreadNamePrefix("TaskMessageScheduler-");
scheduler.initialize();
return scheduler;
}
}- 在 LocalTaskMessageAutoConfig 配置扫描范围时,新增 aop 包的扫描路径,从而将切面组件交由 Spring 容器统一管理。
四、测试验证

26-03-05.09:56:33.237 [main ] INFO ApiTest - The following 1 profile is active: "dev"
26-03-05.09:56:35.104 [main ] INFO HikariDataSource - Retail_HikariCP - Starting...
26-03-05.09:56:35.395 [main ] INFO HikariPool - Retail_HikariCP - Added connection com.mysql.cj.jdbc.ConnectionImpl@2e40fdbd
26-03-05.09:56:35.396 [main ] INFO HikariDataSource - Retail_HikariCP - Start completed.
26-03-05.09:56:35.438 [main ] INFO TaskMessageEventJob - 任务组 [group01] 初始化起始ID为 0,houseNumbers=[0, 1, 2, 3]
26-03-05.09:56:35.446 [main ] INFO TaskMessageEventJob - 任务组 [group01] 已按 cron [0/10 * * * * ?] 调度
26-03-05.09:56:35.450 [main ] INFO TaskMessageEventJob - 任务组 [group02] 初始化起始ID为 0,houseNumbers=[4, 5, 6, 7, 8, 9]
26-03-05.09:56:35.451 [main ] INFO TaskMessageEventJob - 任务组 [group02] 已按 fixedDelayMs [5000] 调度
26-03-05.09:56:36.190 [main ] INFO CachingConnectionFactory - Attempting to connect to: [10.2.16.71:5672]
26-03-05.09:56:36.254 [main ] INFO CachingConnectionFactory - Created new connection: rabbitConnectionFactory#7cc2c551:0/SimpleConnection@7d551ec6 [delegate=amqp://[email protected]:5672/, localPort=46127]
26-03-05.09:56:36.335 [main ] INFO ApiTest - Started ApiTest in 3.594 seconds (process running for 4.514)
Java HotSpot(TM) 64-Bit Server VM warning: Sharing is only supported for boot loader classes because bootstrap classpath has been appended
26-03-05.09:56:36.807 [main ] INFO LocalTaskMessageAop - LocalTaskMessageAop 提取到命令对象(同一事务): 方法=TestAopAnnotationService.insertOrderTransactional_01(..) 路径=command 命令=TaskMessageEntityCommand(id=null, taskId=TASK_537625111801, taskName=gpt调用测试任务, notifyType=http, notifyConfig=TaskMessageEntityCommand.NotifyConfig(mq=null, http=TaskMessageEntityCommand.NotifyConfig.HTTP(url=http://127.0.0.1:8091/v1/chat/completions, method=post, contentType=application/json, authorization=null)), status=0, parameterJson={"model": "gpt-4o", "messages": [{"role": "user", "content": "1+1"}]})
26-03-05.09:56:36.812 [main ] INFO LocalTaskMessageHandleService - 受理任务消息: TaskMessageEntityCommand(id=null, taskId=TASK_537625111801, taskName=gpt调用测试任务, notifyType=http, notifyConfig=TaskMessageEntityCommand.NotifyConfig(mq=null, http=TaskMessageEntityCommand.NotifyConfig.HTTP(url=http://127.0.0.1:8091/v1/chat/completions, method=post, contentType=application/json, authorization=null)), status=0, parameterJson={"model": "gpt-4o", "messages": [{"role": "user", "content": "1+1"}]})
26-03-05.09:56:36.962 [task-1 ] INFO TaskMessageEventListener - 收到任务消息事件 - 消息内容: TaskMessageEntityCommand(id=null, taskId=TASK_537625111801, taskName=gpt调用测试任务, notifyType=http, notifyConfig=TaskMessageEntityCommand.NotifyConfig(mq=null, http=TaskMessageEntityCommand.NotifyConfig.HTTP(url=http://127.0.0.1:8091/v1/chat/completions, method=post, contentType=application/json, authorization=null)), status=0, parameterJson={"model": "gpt-4o", "messages": [{"role": "user", "content": "1+1"}]}), 事件时间戳: 1772675796956
26-03-05.09:56:36.985 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-2] INFO SimpleMessageListenerContainer - Waiting for workers to finish.
26-03-05.09:56:37.189 [task-1 ] INFO LocalTaskMessageRepository - 更新任务状态为成功,taskId: TASK_537625111801
26-03-05.09:56:37.189 [task-1 ] INFO TaskMessageEventListener - 收到任务消息事件 - 通知结果: {"id":"55744752-084c-4024-9b69-4c0a4abdf586","model":"gpt-4o","messages":[{"role":"user","content":"1+1"}]}
26-03-05.09:56:37.329 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-2] INFO SimpleMessageListenerContainer - Successfully waited for workers to finish.
26-03-05.09:56:37.337 [SpringApplicationShutdownHook] INFO HikariDataSource - Retail_HikariCP - Shutdown initiated...
26-03-05.09:56:37.346 [SpringApplicationShutdownHook] INFO HikariDataSource - Retail_HikariCP - Shutdown completed.在测试工程中,引入本节最新打包的 local-task-message 包,并同时引入 aop 切面相关依赖。
随后编写 TestAopAnnotationService,通过注解方式调用即可。这样业务代码无需手动编写任何任务消息逻辑,整体结构更加简洁