通知策略处理(HTTP&MQ)
一、本章介绍
设计通用的本地消息任务表及其数据插入处理。该表由引入技术组件的上游系统自行在数据库中创建配置。随后,在调用组件时,即可使用同一数据源对该库表进行统一操作。
二、功能流程
如图,是本节关于事件消息回调通知的处理流程(暂时先不处理关于数据库的更新流程);
.drawio.png)
首先,事件消息的监听发生在 trigger 层,随后调用领域层的通知服务方法。为此,需要新增一个领域方法。
其次,领域服务方法根据 notifyType 的不同类型执行通知操作,例如 HTTP 或 MQ;若未来需扩展其他类型,可在此处直接添加。
最后,HTTP 和 MQ 操作均进入基础设施层完成服务调用处理。其中,HTTP 使用 Retrofit2 框架进行封装;MQ(RabbitMQ)则直接通过 RabbitTemplate 模板推送消息即可。
三、编码实现
1. 工程结构

- 本节的重点在于,实现HTTP远程调用和MQ(RabbitMQ)消息推送服务,随后由TaskMessageEventListener进行调用和使用。
2. 功能开发
2.1 HTTP 框架使用

在基础设施层,定义一个通用的HTTP服务接口GenericHttpGateway。随后,在config配置下的LocalTaskMessageAutoConfig类中进行实例化操作。
简而言之,就是定义一个通用的HTTP调用处理器,使用时只需传入URL、headers和body参数即可完成调用处理。该框架较为面向对象,使用起来简洁轻量,因此选择采用(当然,你也可以替换为其他HTTP框架)。
此外,如果你还想扩展其他调用方式,例如WebSocket、Socket或RPC(泛化调用),也可以在基础设施层进行相应扩展。
2.2 MQ 消息推送
@Slf4j
@Component
public class RabbitMQEvent {
@Autowired(required = false)
private RabbitTemplate rabbitTemplate;
public void publish(String exchange, String routingKey, String message) {
try {
if (null == rabbitTemplate){
log.error("应用服务方,尚未配置 RabbitMQ Template 不能完成 MQ 发送");
return;
}
rabbitTemplate.convertAndSend(exchange, routingKey, message, m -> {
// 持久化消息配置
m.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return m;
});
} catch (Exception e) {
log.error("发送MQ消息失败 exchange:{} routingKey:{} message:{}", exchange, routingKey, message, e);
throw e;
}
}
}- RabbitMQ 推送需要使用 RabbitTemplate 模板,不过注入的时候要配置
@Autowired(required = false)这样如果用户没有使用 RabbitMQ 也不会报错 - 之后 publish 下的代码是推送消息的通用逻辑,可以指定交换机进行推送。
2.3 通知策略(http、mq)
public interface INotifyStrategy {
String notify(TaskMessageEntityCommand command) throws Exception;
}
@Component("httpNotifyStrategy")
public class HTTPNotifyStrategy implements INotifyStrategy {
@Resource
private ILocalTaskMessagePort port;
@Override
public String notify(TaskMessageEntityCommand command) throws Exception {
try {
return port.notify2http(command);
} catch (Exception e) {
throw e;
}
}
}
@Component("rabbitMQNotifyStrategy")
public class RabbitMQNotifyStrategy implements INotifyStrategy {
@Resource
private ILocalTaskMessagePort port;
@Override
public String notify(TaskMessageEntityCommand command) throws Exception {
try {
return port.notify2rabbitmq(command);
} catch (Exception e) {
throw e;
}
}
}对于我们指定的枚举定义,扩展出获取策略(strategy)的方法,并为每种类型的发送方式添加对应的策略处理。
借助这一配置,我们在获取用户当前使用的枚举时,即可轻松取得需执行的对应策略。这种设计在互联网ToC(面向消费者)的复杂业务场景中极为常见,且是一种高效的解决方案。
public class LocalTaskMessageNotifyService implements ILocalTaskMessageNotifyService {
private final Map<String, INotifyStrategy> notifyStrategyConfig;
public LocalTaskMessageNotifyService(Map<String, INotifyStrategy> notifyStrategyConfig, ILocalTaskMessageRepository repository) {
this.notifyStrategyConfig = notifyStrategyConfig;
}
@Override
public String notify(TaskMessageEntityCommand command) throws Exception {
// 获取通知策略
String strategyBeanName = TaskNotifyEnum.getStrategyByType(command.getNotifyType());
INotifyStrategy notifyStrategy = notifyStrategyConfig.get(strategyBeanName);
// 执行通知操作
return notifyStrategy.notify(command);
}
}- 最后就可以实现策略通知调用的方法,并在里面完成通知调用操作。
四、测试验证
1. 环境配置
在执行测试前,需要在本地安装RabbitMQ。推荐采用Docker方式进行安装,若出现问题,也能轻松便捷地删除。
1.1 基础环境
- Mac电脑,安装Docker后,直接点击执行即可安装。
- Windows电脑,安装Docker后,需要在powershell里,切换到此文件夹,通过执行
docker-compose-f docker-compose-environment.yml up -d执行。


- 安装到这里,你就可以在你的程序里使用 RabbitMQ 了。
1.2 工程环境
server:
port: 8091
# 数据库配置;启动时配置数据库资源信息
spring:
datasource:
username: root
password: Nstr.234808
url: jdbc:mysql://10.2.16.69:3306/local_task_message?useUnicode=true&characterEncoding=utf8&autoReconnect=true&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&useSSL=true&sessionVariables=sql_mode='NO_ENGINE_SUBSTITUTION,STRICT_TRANS_TABLES'
driver-class-name: com.mysql.cj.jdbc.Driver
type: com.zaxxer.hikari.HikariDataSource
hikari:
pool-name: Retail_HikariCP
minimum-idle: 15 #最小空闲连接数量
idle-timeout: 180000 #空闲连接存活最大时间,默认600000(10分钟)
maximum-pool-size: 25 #连接池最大连接数,默认是10
auto-commit: true #此属性控制从池返回的连接的默认自动提交行为,默认值:true
max-lifetime: 1800000 #此属性控制池中连接的最长生命周期,值0表示无限生命周期,默认1800000即30分钟
connection-timeout: 30000 #数据库连接超时时间,默认30秒,即30000
connection-test-query: SELECT 1
# RabbitMQ
rabbitmq:
addresses: 10.2.16.71
port: 15672
username: admin
password: admin
listener:
simple:
prefetch: 1 # 每次投递n个消息,消费完在投递n个
template:
delivery-mode: persistent # 确保全局默认设置为持久化(可选)
# 消息配置
config:
# 生产者
producer:
# 绑定交换机,统一一套交换机
exchange: ltm_test_exchange
# 消息主题配置;路由key、队列
topic_order_success:
# 消息主题
routing_key: topic.order_success
# 消费队列
queue: ltm_test_queue_2_order_success
# MyBatis 配置【如需使用记得打开】
mybatis:
mapper-locations: classpath:/mybatis/mapper/*.xml
config-location: classpath:/mybatis/config/mybatis-config.xml
logging:
level:
root: info
config: classpath:logback-spring.xml- 测试工程需配置RabbitMQ连接信息。注意将addresses修改为你自己的值。若RabbitMQ部署在服务端,请确保端口已开放。
2. 回调测试
测试前,请先启动服务端工程的 Application(主启动类),因为测试工程对外提供了 HTTP 接口服务。具体配置可直接查看测试工程代码。
此外,测试工程还配置了 RabbitMQ 的连接信息(addresses、port 等),同样可直接阅读代码获取。
2.1 http 回调
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class Notify2HTTPTest {
@Resource
private ILocalTaskMessageHandleService handleService;
@Test
public void test() throws InterruptedException {
TaskMessageEntityCommand taskMessageEntityCommand = new TaskMessageEntityCommand(
"TASK_NEW_25111501",
"gpt调用测试任务",
TaskNotifyEnum.HTTP,
TaskMessageEntityCommand.NotifyConfig.builder()
.http(TaskMessageEntityCommand.NotifyConfig.HTTP.builder()
.url("http://127.0.0.1:8091/v1/chat/completions")
.method("post")
.contentType("application/json")
.build())
.build(),
"{\"model\": \"gpt-4o\", \"messages\": [{\"role\": \"user\", \"content\": \"1+1\"}]}"
);
handleService.acceptTaskMessage(taskMessageEntityCommand);
new CountDownLatch(1).await();
}
}
26-02-06.17:29:18.634 [http-nio-8091-exec-1] INFO ChatCompletionsController - [ChatCompletions] response id=2dacd96b-5841-4e85-aa62-d71d2b367ccd, model=gpt-4o, messages_size=1- 测试工程,ChatCompletionsController 接口日志。
26-02-06.17:28:59.714 [main ] INFO LocalTaskMessageHandleService - 受理任务消息: TaskMessageEntityCommand(taskId=TASK_NEW_25111501, taskName=gpt调用测试任务, notifyType=http, notifyConfig=TaskMessageEntityCommand.NotifyConfig(mq=null, http=TaskMessageEntityCommand.NotifyConfig.HTTP(url=http://127.0.0.1:8091/v1/chat/completions, method=post, contentType=application/json, authorization=null)), status=0, parameterJson={"model": "gpt-4o", "messages": [{"role": "user", "content": "1+1"}]})
26-02-06.17:29:02.692 [main ] INFO HikariDataSource - Retail_HikariCP - Starting...
26-02-06.17:29:06.983 [main ] INFO HikariPool - Retail_HikariCP - Added connection com.mysql.cj.jdbc.ConnectionImpl@3adbd038
26-02-06.17:29:07.010 [main ] INFO HikariDataSource - Retail_HikariCP - Start completed.
26-02-06.17:29:12.798 [task-1 ] INFO TaskMessageEventListener - 收到任务消息事件 - 消息内容: TaskMessageEntityCommand(taskId=TASK_NEW_25111501, taskName=gpt调用测试任务, notifyType=http, notifyConfig=TaskMessageEntityCommand.NotifyConfig(mq=null, http=TaskMessageEntityCommand.NotifyConfig.HTTP(url=http://127.0.0.1:8091/v1/chat/completions, method=post, contentType=application/json, authorization=null)), status=0, parameterJson={"model": "gpt-4o", "messages": [{"role": "user", "content": "1+1"}]}), 事件时间戳: 1770370151627
26-02-06.17:29:18.661 [task-1 ] INFO TaskMessageEventListener - 收到任务消息事件 - 通知结果: {"id":"2dacd96b-5841-4e85-aa62-d71d2b367ccd","model":"gpt-4o","messages":[{"role":"user","content":"1+1"}]}- TaskNotifyEnum.HTTP、TaskMessageEntityCommand.NotifyConfig.HTTP 要匹配,配置了什么类型,就要给对应的参数。
2.2 mq 通知
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class Notify2RabbitMQTest {
@Resource
private ILocalTaskMessageHandleService handleService;
@Value("${spring.rabbitmq.config.producer.exchange}")
private String exchange;
@Value("${spring.rabbitmq.config.producer.topic_order_success.routing_key}")
private String routingKey;
@Test
public void test() throws InterruptedException {
TaskMessageEntityCommand command1 = new TaskMessageEntityCommand();
command1.setTaskId("TASK_NEW_2026020601");
command1.setTaskName("gpt调用测试任务");
command1.setNotifyType(TaskNotifyEnum.RABBIT_MQ.getType());
command1.setNotifyConfig(
TaskMessageEntityCommand.NotifyConfig.builder()
.mq(TaskMessageEntityCommand.NotifyConfig.MQ.builder()
.exchange(exchange)
.topic(routingKey)
.build())
.build());
command1.setStatus(0);
command1.setParameterJson("{\"model\": \"gpt-4o\", \"messages\": [{\"role\": \"user\", \"content\": \"1+1\"}]}");
handleService.acceptTaskMessage(command1);
new CountDownLatch(1).await();
}
}
26-02-06.17:43:10.027 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO TestTopicListener - 接收消息(mq):{"model": "gpt-4o", "messages": [{"role": "user", "content": "1+1"}]}- 测试工程,TestTopicListener 监听业务 MQ 消息日志。
26-02-06.17:41:42.450 [main ] INFO LocalTaskMessageHandleService - 受理任务消息: TaskMessageEntityCommand(taskId=TASK_NEW_2026020601, taskName=gpt调用测试任务, notifyType=rabbit_mq, notifyConfig=TaskMessageEntityCommand.NotifyConfig(mq=TaskMessageEntityCommand.NotifyConfig.MQ(topic=topic.order_success, exchange=ltm_test_exchange), http=null), status=0, parameterJson={"model": "gpt-4o", "messages": [{"role": "user", "content": "1+1"}]})
26-02-06.17:41:45.360 [main ] INFO HikariDataSource - Retail_HikariCP - Starting...
26-02-06.17:41:49.517 [main ] INFO HikariPool - Retail_HikariCP - Added connection com.mysql.cj.jdbc.ConnectionImpl@8aa1562
26-02-06.17:41:49.555 [main ] INFO HikariDataSource - Retail_HikariCP - Start completed.
26-02-06.17:41:52.933 [task-1 ] INFO TaskMessageEventListener - 收到任务消息事件 - 消息内容: TaskMessageEntityCommand(taskId=TASK_NEW_2026020601, taskName=gpt调用测试任务, notifyType=rabbit_mq, notifyConfig=TaskMessageEntityCommand.NotifyConfig(mq=TaskMessageEntityCommand.NotifyConfig.MQ(topic=topic.order_success, exchange=ltm_test_exchange), http=null), status=0, parameterJson={"model": "gpt-4o", "messages": [{"role": "user", "content": "1+1"}]}), 事件时间戳: 1770370912877
26-02-06.17:43:09.966 [Retail_HikariCP:housekeeper] WARN HikariPool - Retail_HikariCP - Thread starvation or clock leap detected (housekeeper delta=1m20s305ms208µs800ns).
26-02-06.17:43:09.991 [task-1 ] INFO TaskMessageEventListener - 收到任务消息事件 - 通知结果: success- MQ 的通知操作,通用要配置对应的信息,也就是发送 MQ 的信息,这样组件就知道你要发送什么了。