FlowAgent执行链路设计
一、介绍
在测试阶段的 Agent Test 代码中,我们将使用设计模式将各个执行步骤进行拆分,以提升代码的可读性和可维护性。
本节的目标是: 将上一节中编写的 FlowAgent 测试代码,按照模块化流程进行结构化拆分,使每个执行步骤职责更加清晰,方便后续调试与扩展。
二、功能流程
如图,Flow Ai Agent 动态步骤分析执行流程图;

本节新增一种 Agent 执行策略:流程步骤拆分执行。相比上一版 AutoAgent,更为简洁。
流程包含四步(对应 FlowAgentTest.test_agent):
- Step1 工具分析:识别可用 MCP 工具与能力
- Step2 动作规划:生成总体行动方案
- Step3 拆分步骤:细化为可执行的子步骤序列
- Step4 执行节点(循环执行):按序逐步执行并记录结果
最后产出统一的响应结果。后续章节将用 SSE 将结果推送至前端;本节暂时仅做判 null 处理,不发送 SSE 数据。
三、工程实现
1. 工程结构

execute用于执行 Agent 操作,目前支持两种方案:auto 和 flow。其中,flow 是本节新增的执行方式,通过设计明确的流程步骤来完成任务执行。在 flow 模式下的代码逻辑,来源于
FlowAgentTest.test_agent的拆解内容,整体结构与逻辑基本保持一致,只是进行了模块化与策略化封装,方便扩展与维护。
2. 修改说明
- 新增 RootNode、Step1~4、AbstractExecuteSupport、DefaultFlowAgentExecuteStrategyFactory,用于模块化拆解
FlowAgentTest.test_agent中的逻辑与代码结构。 - FlowAgentExecuteStrategy 负责在执行前进行统一的逻辑处理与调度,随后调用具体的 Agent 实现以完成核心执行。
3. 库表数据

- 本节不需要调整库表,只是增加一套新的 ai agent 对应的配置即可。
4. 拆分节点 - 工具分析

如图,将测试代码的第一个步骤拆分为第一个节点;按同样方式,依次创建 1、2、3、4 个节点,分别对应并承接测试代码中的各个执行步骤,实现流程化、模块化的执行路径。
4.1 工具分析
@Slf4j
@Service
public class Step1McpToolsAnalysisNode extends AbstractExecuteSupport {
@Resource
private Step2PlanningNode step2PlanningNode;
@Override
protected String doApply(ExecuteCommandEntity requestParameter, DefaultFlowAgentExecuteStrategyFactory.DynamicContext dynamicContext) throws Exception {
log.info("\n--- 步骤1: MCP工具能力分析(仅分析阶段,不执行用户请求) ---");
// 获取配置信息
AiAgentClientFlowConfigVO aiAgentClientFlowConfigVO = dynamicContext.getAiAgentClientFlowConfigVOMap().get(AiClientTypeEnumVO.TOOL_MCP_CLIENT.getCode());
// 获取MCP工具分析客户端
ChatClient mcpToolsChatClient = getChatClientByClientId(aiAgentClientFlowConfigVO.getClientId());
String mcpAnalysisPrompt = String.format(
"""
# MCP工具能力分析任务
## 重要说明
**注意:本阶段仅进行MCP工具能力分析,不执行用户的实际请求。**\s
这是一个纯分析阶段,目的是评估可用工具的能力和适用性,为后续的执行规划提供依据。
## 用户请求
%s
## 分析要求
请基于上述实际的MCP工具信息,针对用户请求进行详细的工具能力分析(仅分析,不执行):
### 1. 工具匹配分析
- 分析每个可用工具的核心功能和适用场景
- 评估哪些工具能够满足用户请求的具体需求
- 标注每个工具的匹配度(高/中/低)
### 2. 工具使用指南
- 提供每个相关工具的具体调用方式
- 说明必需的参数和可选参数
- 给出参数的示例值和格式要求
### 3. 执行策略建议
- 推荐最优的工具组合方案
- 建议工具的调用顺序和依赖关系
- 提供备选方案和降级策略
### 4. 注意事项
- 标注工具的使用限制和约束条件
- 提醒可能的错误情况和处理方式
- 给出性能优化建议
### 5. 分析总结
- 明确说明这是分析阶段,不要执行用的任何实际操作
- 总结工具能力评估结果
- 为后续执行阶段提供建议
请确保分析结果准确、详细、可操作,并再次强调这仅是分析阶段。""",
dynamicContext.getCurrentTask()
);
String mcpToolsAnalysis = mcpToolsChatClient.prompt()
.user(mcpAnalysisPrompt)
.call()
.content();
log.info("MCP工具分析结果(仅分析,未执行实际操作): {}", mcpToolsAnalysis);
// 保存分析结果到上下文
dynamicContext.setValue("mcpToolsAnalysis", mcpToolsAnalysis);
// 发送SSE结果
AutoAgentExecuteResultEntity result = AutoAgentExecuteResultEntity.createAnalysisSubResult(
dynamicContext.getStep(),
"analysis_tools",
mcpToolsAnalysis,
requestParameter.getSessionId());
sendSseResult(dynamicContext, result);
// 更新步骤
dynamicContext.setStep(dynamicContext.getStep() + 1);
return router(requestParameter, dynamicContext);
}
@Override
public StrategyHandler<ExecuteCommandEntity, DefaultFlowAgentExecuteStrategyFactory.DynamicContext, String> get(ExecuteCommandEntity requestParameter, DefaultFlowAgentExecuteStrategyFactory.DynamicContext dynamicContext) throws Exception {
return step2PlanningNode;
}
}此步骤的目标是 MCP 工具能力分析节点。 通过读取当前服务配置的能力信息,Agent 能够判断有哪些可用的 MCP 工具,从而结合这些能力来解析并解决用户的提问。
4.2 步骤规划
@Slf4j
@Service
public class Step2PlanningNode extends AbstractExecuteSupport {
@Resource
private Step3ParseStepsNode step3ParseStepsNode;
@Override
protected String doApply(ExecuteCommandEntity requestParameter, DefaultFlowAgentExecuteStrategyFactory.DynamicContext dynamicContext) throws Exception {
log.info("\n--- 步骤2: 执行步骤规划 ---");
// 获取配置信息
AiAgentClientFlowConfigVO aiAgentClientFlowConfigVO = dynamicContext.getAiAgentClientFlowConfigVOMap().get(AiClientTypeEnumVO.PLANNING_CLIENT.getCode());
// 获取规划客户端
ChatClient planningChatClient = getChatClientByClientId(aiAgentClientFlowConfigVO.getClientId());
String userRequest = dynamicContext.getCurrentTask();
String mcpToolsAnalysis = dynamicContext.getValue("mcpToolsAnalysis");
String planningPrompt = buildStructuredPlanningPrompt(userRequest, mcpToolsAnalysis);
String refinedPrompt = planningPrompt + "\n\n## ⚠️ 工具映射验证反馈\n" +
"\n\n**请根据上述验证反馈重新生成规划,确保:**\n" +
"1. 只使用验证报告中列出的有效工具\n" +
"2. 工具名称必须完全匹配(区分大小写)\n" +
"3. 每个步骤明确指定使用的MCP工具\n" +
"4. 避免使用不存在或无效的工具";
String planningResult = planningChatClient.prompt()
.user(refinedPrompt)
.call()
.content();
log.info("执行步骤规划结果: {}", planningResult);
// 保存规划结果到上下文
dynamicContext.setValue("planningResult", planningResult);
// 发送SSE结果
AutoAgentExecuteResultEntity result = AutoAgentExecuteResultEntity.createAnalysisSubResult(
dynamicContext.getStep(),
"analysis_strategy",
planningResult,
requestParameter.getSessionId());
sendSseResult(dynamicContext, result);
// 更新步骤
dynamicContext.setStep(dynamicContext.getStep() + 1);
return router(requestParameter, dynamicContext);
}
/**
* 构建结构化的规划提示词
*/
private String buildStructuredPlanningPrompt(String userRequest, String mcpToolsAnalysis) {
StringBuilder prompt = new StringBuilder();
// 1. 任务分析部分 - 通用化用户需求分析
prompt.append("# 智能执行计划生成\n\n");
prompt.append("## 📋 用户需求分析\n");
prompt.append("**完整用户请求:**\n");
prompt.append("```\n");
prompt.append(userRequest);
prompt.append("\n```\n\n");
prompt.append("**⚠️ 重要提醒:** 在生成执行计划时,必须完整保留和传递用户请求中的所有详细信息,包括但不限于:\n");
prompt.append("- 任务的具体目标和期望结果\n");
prompt.append("- 涉及的数据、参数、配置等详细信息\n");
prompt.append("- 特定的业务规则、约束条件或要求\n");
prompt.append("- 输出格式、质量标准或验收条件\n");
prompt.append("- 时间要求、优先级或其他执行约束\n\n");
// 2. 工具能力分析
prompt.append("## 🔧 MCP工具能力分析结果\n");
prompt.append(mcpToolsAnalysis).append("\n\n");
// 3. 工具映射验证 - 使用动态获取的工具信息
prompt.append("## ✅ 工具映射验证要求\n");
prompt.append("**重要提醒:** 在生成执行步骤时,必须严格遵循以下工具映射规则:\n\n");
// 动态获取实际的MCP工具信息
String actualToolsInfo = getActualMcpToolsInfo();
prompt.append("### 可用工具清单\n");
prompt.append(actualToolsInfo).append("\n");
prompt.append("### 工具选择原则\n");
prompt.append("- **精确匹配**: 每个步骤必须使用上述工具清单中的确切函数名称\n");
prompt.append("- **功能对应**: 根据MCP工具分析结果中的匹配度选择最适合的工具\n");
prompt.append("- **参数完整**: 确保每个工具调用都包含必需的参数说明\n");
prompt.append("- **依赖关系**: 考虑工具间的数据流转和依赖关系\n\n");
// 4. 执行计划要求
prompt.append("## 📝 执行计划要求\n");
prompt.append("请基于上述用户详细需求、MCP工具分析结果和工具映射验证要求,生成精确的执行计划:\n\n");
prompt.append("### 核心要求\n");
prompt.append("1. **完整保留用户需求**: 必须将用户请求中的所有详细信息完整传递到每个执行步骤中\n");
prompt.append("2. **严格遵循MCP分析结果**: 必须根据工具能力分析中的匹配度和推荐方案制定步骤\n");
prompt.append("3. **精确工具映射**: 每个步骤必须使用确切的函数名称,不允许使用模糊或错误的工具名\n");
prompt.append("4. **参数完整性**: 所有工具调用必须包含用户原始需求中的完整参数信息\n");
prompt.append("5. **依赖关系明确**: 基于MCP分析结果中的执行策略建议安排步骤顺序\n");
prompt.append("6. **合理粒度**: 避免过度细分,每个步骤应该是完整且独立的功能单元\n\n");
// 4. 格式规范 - 通用化任务格式
prompt.append("### 格式规范\n");
prompt.append("请使用以下Markdown格式生成3-5个执行步骤:\n");
prompt.append("```markdown\n");
prompt.append("# 执行步骤规划\n\n");
prompt.append("[ ] 第1步:[步骤描述]\n");
prompt.append("[ ] 第2步:[步骤描述]\n");
prompt.append("[ ] 第3步:[步骤描述]\n");
prompt.append("...\n\n");
prompt.append("## 步骤详情\n\n");
prompt.append("### 第1步:[步骤描述]\n");
prompt.append("- **优先级**: [HIGH/MEDIUM/LOW]\n");
prompt.append("- **预估时长**: [分钟数]分钟\n");
prompt.append("- **使用工具**: [必须使用确切的函数名称]\n");
prompt.append("- **工具匹配度**: [引用MCP分析结果中的匹配度评估]\n");
prompt.append("- **依赖步骤**: [前置步骤序号,如无依赖则填写'无']\n");
prompt.append("- **执行方法**: [基于MCP分析结果的具体执行策略,包含工具调用参数]\n");
prompt.append("- **工具参数**: [详细的参数说明和示例值,必须包含用户原始需求中的所有相关信息]\n");
prompt.append("- **需求传递**: [明确说明如何将用户的详细要求传递到此步骤中]\n");
prompt.append("- **预期输出**: [期望的最终结果]\n");
prompt.append("- **成功标准**: [判断任务完成的标准]\n");
prompt.append("- **MCP分析依据**: [引用具体的MCP工具分析结论]\n\n");
prompt.append("```\n\n");
// 5. 动态规划指导原则
prompt.append("### 规划指导原则\n");
prompt.append("请根据用户详细请求和可用工具能力,动态生成合适的执行步骤:\n");
prompt.append("- **需求完整性原则**: 确保用户请求中的所有详细信息都被完整保留和传递\n");
prompt.append("- **步骤分离原则**: 每个步骤应该专注于单一功能,避免混合不同类型的操作\n");
prompt.append("- **工具映射原则**: 每个步骤应明确使用哪个具体的MCP工具\n");
prompt.append("- **参数传递原则**: 确保用户的详细要求能够准确传递到工具参数中\n");
prompt.append("- **依赖关系原则**: 合理安排步骤顺序,确保前置条件得到满足\n");
prompt.append("- **结果输出原则**: 每个步骤都应有明确的输出结果和成功标准\n\n");
// 6. 步骤类型指导
prompt.append("### 步骤类型指导\n");
prompt.append("根据可用工具和用户需求,常见的步骤类型包括:\n");
prompt.append("- **数据获取步骤**: 使用搜索、查询等工具获取所需信息\n");
prompt.append("- **数据处理步骤**: 对获取的信息进行分析、整理和加工\n");
prompt.append("- **内容生成步骤**: 基于处理后的数据生成目标内容\n");
prompt.append("- **结果输出步骤**: 将生成的内容发布、保存或传递给用户\n");
prompt.append("- **通知反馈步骤**: 向用户或相关方发送执行结果通知\n\n");
// 7. 执行要求
prompt.append("### 执行要求\n");
prompt.append("1. **步骤编号**: 使用第1步、第2步、第3步...格式\n");
prompt.append("2. **Markdown格式**: 严格按照上述Markdown格式输出\n");
prompt.append("3. **步骤描述**: 每个步骤描述要清晰、具体、可执行\n");
prompt.append("4. **优先级**: 根据步骤重要性和紧急程度设定\n");
prompt.append("5. **时长估算**: 基于步骤复杂度合理估算\n");
prompt.append("6. **工具选择**: 从可用工具中选择最适合的,必须使用完整的函数名称\n");
prompt.append("7. **依赖关系**: 明确步骤间的先后顺序\n");
prompt.append("8. **执行细节**: 提供具体可操作的方法,包含详细的参数说明和用户需求传递\n");
prompt.append("9. **需求传递**: 确保用户的所有详细要求都能准确传递到相应的执行步骤中\n");
prompt.append("10. **功能独立**: 确保每个步骤功能独立,避免混合不同类型的操作\n");
prompt.append("11. **工具映射**: 每个步骤必须明确指定使用的MCP工具函数名称\n");
prompt.append("12. **质量标准**: 设定明确的完成标准\n\n");
// 7. 步骤类型指导
prompt.append("### 常见步骤类型指导\n");
prompt.append("- **信息获取步骤**: 使用搜索工具,关注关键词选择和结果筛选\n");
prompt.append("- **内容处理步骤**: 基于获取的信息进行分析、整理和创作\n");
prompt.append("- **结果输出步骤**: 使用相应平台工具发布或保存处理结果\n");
prompt.append("- **通知反馈步骤**: 使用通信工具进行状态通知或结果反馈\n");
prompt.append("- **数据处理步骤**: 对获取的信息进行分析、转换和处理\n\n");
// 8. 质量检查
prompt.append("### 质量检查清单\n");
prompt.append("生成计划后请确认:\n");
prompt.append("- [ ] 每个步骤都有明确的序号和描述\n");
prompt.append("- [ ] 使用了正确的Markdown格式\n");
prompt.append("- [ ] 步骤描述清晰具体\n");
prompt.append("- [ ] 优先级设置合理\n");
prompt.append("- [ ] 时长估算现实可行\n");
prompt.append("- [ ] 工具选择恰当\n");
prompt.append("- [ ] 依赖关系清晰\n");
prompt.append("- [ ] 执行方法具体可操作\n");
prompt.append("- [ ] 成功标准明确可衡量\n\n");
prompt.append("现在请开始生成Markdown格式的执行步骤规划:\n");
return prompt.toString();
}
/**
* 获取实际的MCP工具信息
*/
private String getActualMcpToolsInfo() {
StringBuilder toolsInfo = new StringBuilder();
toolsInfo.append("# 当前注册的MCP工具列表\n\n");
try {
// 获取百度搜索工具信息
toolsInfo.append("## 1. 百度搜索工具 (BaiduSearch)\n");
toolsInfo.append("- **服务端点**: http://localhost:8080/mcp/baidu-search\n");
toolsInfo.append("- **核心功能**: 通过百度搜索引擎检索技术资料和信息\n");
toolsInfo.append("- **主要工具函数**: search\n");
toolsInfo.append("- **参数要求**: query(搜索关键词)\n");
toolsInfo.append("- **适用场景**: 技术资料搜索、信息收集、知识获取\n\n");
// 获取CSDN工具信息
toolsInfo.append("## 2. CSDN发布工具 (CsdnPublish)\n");
toolsInfo.append("- **服务端点**: http://localhost:8080/mcp/csdn\n");
toolsInfo.append("- **核心功能**: 发布技术文章到CSDN平台\n");
toolsInfo.append("- **主要工具函数**: publish_article\n");
toolsInfo.append("- **参数要求**: title(文章标题), content(文章内容), tags(标签)\n");
toolsInfo.append("- **适用场景**: 技术文章发布、知识分享、内容创作\n\n");
// 获取微信工具信息
toolsInfo.append("## 3. 微信通知工具 (WeixinNotify)\n");
toolsInfo.append("- **服务端点**: http://localhost:8080/mcp/weixin\n");
toolsInfo.append("- **核心功能**: 发送微信通知消息\n");
toolsInfo.append("- **主要工具函数**: send_message\n");
toolsInfo.append("- **参数要求**: message(消息内容), recipient(接收者)\n");
toolsInfo.append("- **适用场景**: 状态通知、结果反馈、任务提醒\n\n");
} catch (Exception e) {
log.warn("获取MCP工具信息时发生错误: {}", e.getMessage());
toolsInfo.append("## 工具信息获取失败\n");
toolsInfo.append("请检查MCP服务连接状态\n\n");
}
return toolsInfo.toString();
}
@Override
public StrategyHandler<ExecuteCommandEntity, DefaultFlowAgentExecuteStrategyFactory.DynamicContext, String> get(ExecuteCommandEntity requestParameter, DefaultFlowAgentExecuteStrategyFactory.DynamicContext dynamicContext) throws Exception {
return step3ParseStepsNode;
}
}此步骤的目标是根据 用户的提问 与 MCP 的能力,设计出接下来的执行动作。Agent 会综合分析用户输入的需求内容,以及系统当前可调用的 MCP 工具与功能,生成一套可执行的行动计划,为后续的步骤拆解与执行提供明确方向。
4.3 拆分步骤
@Slf4j
@Service
public class Step3ParseStepsNode extends AbstractExecuteSupport {
@Resource
private Step4ExecuteStepsNode step4ExecuteStepsNode;
@Override
protected String doApply(ExecuteCommandEntity requestParameter, DefaultFlowAgentExecuteStrategyFactory.DynamicContext dynamicContext) throws Exception {
log.info("\n--- 步骤3: 规划步骤解析 ---");
String planningResult = dynamicContext.getValue("planningResult");
if (planningResult == null || planningResult.trim().isEmpty()) {
log.warn("规划结果为空,无法解析步骤");
throw new RuntimeException("规划结果为空,无法解析步骤");
}
Map<String, String> stepsMap = parseExecutionSteps(planningResult);
log.info("成功解析 {} 个执行步骤", stepsMap.size());
// 保存解析结果到上下文
dynamicContext.setValue("stepsMap", stepsMap);
// 构建解析结果摘要
StringBuilder parseResult = new StringBuilder();
parseResult.append("## 步骤解析结果\n\n");
parseResult.append(String.format("成功解析 %d 个执行步骤:\n\n", stepsMap.size()));
for (Map.Entry<String, String> entry : stepsMap.entrySet()) {
parseResult.append(String.format("- **%s**: %s\n",
entry.getKey(),
entry.getValue().split("\n")[0])); // 只显示标题部分
}
// 发送SSE结果
AutoAgentExecuteResultEntity result = AutoAgentExecuteResultEntity.createAnalysisSubResult(
dynamicContext.getStep(),
"analysis_progress",
parseResult.toString(),
requestParameter.getSessionId());
sendSseResult(dynamicContext, result);
// 更新步骤
dynamicContext.setStep(dynamicContext.getStep() + 1);
return router(requestParameter, dynamicContext);
}
/**
* 解析执行步骤
*/
private Map<String, String> parseExecutionSteps(String planningResult) {
Map<String, String> stepsMap = new HashMap<>();
if (planningResult == null || planningResult.trim().isEmpty()) {
return stepsMap;
}
try {
// 使用正则表达式匹配步骤标题和详细内容
Pattern stepPattern = Pattern.compile("### (第\\d+步:[^\\n]+)([\\s\\S]*?)(?=### 第\\d+步:|$)");
Matcher matcher = stepPattern.matcher(planningResult);
while (matcher.find()) {
String stepTitle = matcher.group(1).trim();
String stepContent = matcher.group(2).trim();
// 提取步骤编号
Pattern numberPattern = Pattern.compile("第(\\d+)步:");
Matcher numberMatcher = numberPattern.matcher(stepTitle);
if (numberMatcher.find()) {
String stepNumber = "第" + numberMatcher.group(1) + "步";
String fullStepInfo = stepTitle + "\n" + stepContent;
stepsMap.put(stepNumber, fullStepInfo);
log.debug("解析步骤: {} -> {}", stepNumber, stepTitle);
}
}
// 如果没有匹配到详细步骤,尝试匹配简单的步骤列表
if (stepsMap.isEmpty()) {
Pattern simpleStepPattern = Pattern.compile("\\[ \\] (第\\d+步:[^\\n]+)");
Matcher simpleMatcher = simpleStepPattern.matcher(planningResult);
while (simpleMatcher.find()) {
String stepTitle = simpleMatcher.group(1).trim();
Pattern numberPattern = Pattern.compile("第(\\d+)步:");
Matcher numberMatcher = numberPattern.matcher(stepTitle);
if (numberMatcher.find()) {
String stepNumber = "第" + numberMatcher.group(1) + "步";
stepsMap.put(stepNumber, stepTitle);
log.debug("解析简单步骤: {} -> {}", stepNumber, stepTitle);
}
}
}
log.info("成功解析 {} 个执行步骤", stepsMap.size());
} catch (Exception e) {
log.error("解析规划结果时发生错误", e);
}
return stepsMap;
}
@Override
public StrategyHandler<ExecuteCommandEntity, DefaultFlowAgentExecuteStrategyFactory.DynamicContext, String> get(ExecuteCommandEntity requestParameter, DefaultFlowAgentExecuteStrategyFactory.DynamicContext dynamicContext) throws Exception {
return step4ExecuteStepsNode;
}
}规划完成后,需要将整体方案拆解为多个独立的执行步骤。这样可以让后续节点按照步骤顺序依次循环执行,实现任务的有序推进与结果追踪。这一阶段的核心目标是:将高层次规划转化为可操作的步骤序列,为执行层提供清晰的执行路径。
4.4 执行步骤
@Slf4j
@Service
public class Step4ExecuteStepsNode extends AbstractExecuteSupport {
@Override
protected String doApply(ExecuteCommandEntity request, DefaultFlowAgentExecuteStrategyFactory.DynamicContext dynamicContext) throws Exception {
log.info("开始执行第四步:按顺序执行规划步骤");
try {
// 获取配置信息
AiAgentClientFlowConfigVO aiAgentClientFlowConfigVO = dynamicContext.getAiAgentClientFlowConfigVOMap().get(AiClientTypeEnumVO.EXECUTOR_CLIENT.getCode());
// 获取规划客户端
ChatClient executorChatClient = getChatClientByClientId(aiAgentClientFlowConfigVO.getClientId());
// 从动态上下文获取解析的步骤
Map<String, String> stepsMap = dynamicContext.getValue("stepsMap");
if (stepsMap == null || stepsMap.isEmpty()) {
return "步骤映射为空,无法执行";
}
// 按顺序执行规划步骤
executeStepsInOrder(executorChatClient, stepsMap, dynamicContext);
// 发送SSE结果
AutoAgentExecuteResultEntity result = AutoAgentExecuteResultEntity.createExecutionResult(
dynamicContext.getStep(),
"已完成所有规划步骤的执行",
request.getSessionId()
);
sendSseResult(dynamicContext, result);
// 更新步骤
dynamicContext.setStep(dynamicContext.getStep() + 1);
dynamicContext.setCompleted(true);
log.info("第四步执行完成:所有规划步骤已执行");
return "所有规划步骤执行完成";
} catch (Exception e) {
log.error("第四步执行失败", e);
return "执行步骤失败: " + e.getMessage();
}
}
@Override
public StrategyHandler<ExecuteCommandEntity, DefaultFlowAgentExecuteStrategyFactory.DynamicContext, String> get(ExecuteCommandEntity requestParameter, DefaultFlowAgentExecuteStrategyFactory.DynamicContext dynamicContext) throws Exception {
return defaultStrategyHandler;
}
/**
* 按顺序执行规划步骤
*/
private void executeStepsInOrder(ChatClient executorChatClient, Map<String, String> stepsMap, DefaultFlowAgentExecuteStrategyFactory.DynamicContext dynamicContext) {
if (stepsMap == null || stepsMap.isEmpty()) {
log.warn("步骤映射为空,无法执行");
return;
}
// 按步骤编号排序执行
List<Integer> stepNumbers = new ArrayList<>();
for (String stepKey : stepsMap.keySet()) {
try {
// 从"第1步"、"第2步"等格式中提取数字
Pattern numberPattern = Pattern.compile("第(\\d+)步");
Matcher matcher = numberPattern.matcher(stepKey);
if (matcher.find()) {
stepNumbers.add(Integer.parseInt(matcher.group(1)));
}
} catch (NumberFormatException e) {
log.warn("无法解析步骤编号: {}", stepKey);
}
}
// 排序步骤编号
stepNumbers.sort(Integer::compareTo);
// 按顺序执行每个步骤
for (Integer stepNumber : stepNumbers) {
String stepKey = "第" + stepNumber + "步";
String stepContent = null;
// 查找匹配的步骤内容
for (Map.Entry<String, String> entry : stepsMap.entrySet()) {
if (entry.getKey().startsWith(stepKey)) {
stepContent = entry.getValue();
break;
}
}
if (stepContent != null) {
executeStep(executorChatClient, stepNumber, stepKey, stepContent, dynamicContext);
} else {
log.warn("未找到步骤内容: {}", stepKey);
}
}
}
/**
* 执行单个步骤
*/
private void executeStep(ChatClient executorChatClient, Integer stepNumber, String stepKey, String stepContent, DefaultFlowAgentExecuteStrategyFactory.DynamicContext dynamicContext) {
log.info("\n--- 开始执行 {} ---", stepKey);
log.info("步骤内容: {}", stepContent.substring(0, Math.min(200, stepContent.length())) + "...");
try {
// 更新执行上下文
dynamicContext.setValue("currentStep", stepNumber);
dynamicContext.setValue("currentStepKey", stepKey);
dynamicContext.setValue("currentStepContent", stepContent);
// 使用执行器ChatClient来执行具体步骤
String executionResult = executorChatClient.prompt()
.user(buildStepExecutionPrompt(stepContent, dynamicContext))
.call()
.content();
assert executionResult != null;
log.info("步骤 {} 执行结果: {}", stepNumber, executionResult.substring(0, Math.min(150, executionResult.length())) + "...");
// 保存执行结果
dynamicContext.setValue("step" + stepNumber + "Result", executionResult);
// 发送步骤执行结果的SSE
AutoAgentExecuteResultEntity stepResult = AutoAgentExecuteResultEntity.createExecutionResult(
stepNumber,
stepKey + " 执行完成: " + executionResult.substring(0, Math.min(500, executionResult.length())),
(String) dynamicContext.getValue("sessionId")
);
sendSseResult(dynamicContext, stepResult);
// 短暂延迟,避免请求过于频繁
Thread.sleep(1000);
} catch (Exception e) {
log.error("执行步骤 {} 时发生错误: {}", stepNumber, e.getMessage());
dynamicContext.setValue("step" + stepNumber + "Error", e.getMessage());
// 记录错误但继续执行下一步
handleStepExecutionError(stepNumber, stepKey, e, dynamicContext);
}
log.info("--- 完成执行 {} ---", stepKey);
}
/**
* 处理步骤执行错误
*/
private void handleStepExecutionError(Integer stepNumber, String stepKey, Exception e, DefaultFlowAgentExecuteStrategyFactory.DynamicContext dynamicContext) {
log.warn("步骤 {} 执行失败,尝试恢复策略", stepNumber);
// 记录错误统计
Map<String, Integer> errorStats = dynamicContext.getValue("stepErrorStats");
if (errorStats == null) {
errorStats = new HashMap<>();
dynamicContext.setValue("stepErrorStats", errorStats);
}
errorStats.put("step" + stepNumber, errorStats.getOrDefault("step" + stepNumber, 0) + 1);
// 如果是网络错误,可以尝试重试
if (e.getMessage() != null && (e.getMessage().contains("timeout") || e.getMessage().contains("connection"))) {
log.info("检测到网络错误,将在后续重试机制中处理");
}
// 标记步骤为部分完成状态
dynamicContext.setValue("step" + stepNumber + "Status", "FAILED_WITH_ERROR");
// 发送错误结果的SSE
try {
AutoAgentExecuteResultEntity errorResult = AutoAgentExecuteResultEntity.createExecutionResult(
stepNumber,
stepKey + " 执行失败: " + e.getMessage(),
dynamicContext.getValue("sessionId")
);
sendSseResult(dynamicContext, errorResult);
} catch (Exception sseException) {
log.error("发送错误SSE结果失败", sseException);
}
}
/**
* 构建步骤执行提示词
*/
private String buildStepExecutionPrompt(String stepContent, DefaultFlowAgentExecuteStrategyFactory.DynamicContext dynamicContext) {
return "你是一个智能执行助手,需要执行以下步骤:\n\n" +
"**步骤内容:**\n" +
stepContent + "\n\n" +
"**用户原始请求:**\n" +
dynamicContext.getCurrentTask() + "\n\n" +
"**执行要求:**\n" +
"1. 仔细分析步骤内容,理解需要执行的具体任务\n" +
"2. 如果涉及MCP工具调用,请使用相应的工具\n" +
"3. 提供详细的执行过程和结果\n" +
"4. 如果遇到问题,请说明具体的错误信息\n" +
"5. **重要**: 执行完成后,必须在回复末尾明确输出执行结果,格式如下:\n" +
" ```\n" +
" === 执行结果 ===\n" +
" 状态: [成功/失败]\n" +
" 结果描述: [具体的执行结果描述]\n" +
" 输出数据: [如果有具体的输出数据,请在此列出]\n" +
" ```\n\n" +
"请开始执行这个步骤,并严格按照要求提供详细的执行报告和结果输出。";
}
}四、测试验证
1. 功能测试
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class FlowAgentExecuteTest {
@Resource
private DefaultArmoryStrategyFactory defaultArmoryStrategyFactory;
@Resource
private DefaultFlowAgentExecuteStrategyFactory defaultFlowAgentExecuteStrategyFactory;
@Resource
private ApplicationContext applicationContext;
@Before
public void init() throws Exception {
StrategyHandler<ArmoryCommandEntity, DefaultArmoryStrategyFactory.DynamicContext, String> armoryStrategyHandler =
defaultArmoryStrategyFactory.armoryStrategyHandler();
String apply = armoryStrategyHandler.apply(
ArmoryCommandEntity.builder()
.commandType(AiAgentEnumVO.AI_CLIENT.getCode())
.commandIdList(Arrays.asList("2101", "2102", "2103"))
.build(),
new DefaultArmoryStrategyFactory.DynamicContext());
ChatClient chatClient = (ChatClient) applicationContext.getBean(AiAgentEnumVO.AI_CLIENT.getBeanName("2101"));
log.info("客户端构建:{}", chatClient);
}
@Test
public void testFlowAgentExecute() throws Exception {
StrategyHandler<ExecuteCommandEntity, DefaultFlowAgentExecuteStrategyFactory.DynamicContext, String> executeHandler
= defaultFlowAgentExecuteStrategyFactory.armoryStrategyHandler();
ExecuteCommandEntity executeCommandEntity = new ExecuteCommandEntity();
executeCommandEntity.setAiAgentId("1");
executeCommandEntity.setMessage("""
我需要你帮我生成一篇文章,要求如下;
1. 场景为互联网大厂java求职者面试
2. 提问的技术栈如下;
核心语言与平台: Java SE (8/11/17), Jakarta EE (Java EE), JVM
构建工具: Maven, Gradle, Ant
Web框架: Spring Boot, Spring MVC, Spring WebFlux, Jakarta EE, Micronaut, Quarkus, Play Framework, Struts (Legacy)
数据库与ORM: Hibernate, MyBatis, JPA, Spring Data JDBC, HikariCP, C3P0, Flyway, Liquibase
测试框架: JUnit 5, TestNG, Mockito, PowerMock, AssertJ, Selenium, Cucumber
微服务与云原生: Spring Cloud, Netflix OSS (Eureka, Zuul), Consul, gRPC, Apache Thrift, Kubernetes Client, OpenFeign, Resilience4j
安全框架: Spring Security, Apache Shiro, JWT, OAuth2, Keycloak, Bouncy Castle
消息队列: Kafka, RabbitMQ, ActiveMQ, JMS, Apache Pulsar, Redis Pub/Sub
缓存技术: Redis, Ehcache, Caffeine, Hazelcast, Memcached, Spring Cache
日志框架: Log4j2, Logback, SLF4J, Tinylog
监控与运维: Prometheus, Grafana, Micrometer, ELK Stack, New Relic, Jaeger, Zipkin
模板引擎: Thymeleaf, FreeMarker, Velocity, JSP/JSTL
REST与API工具: Swagger/OpenAPI, Spring HATEOAS, Jersey, RESTEasy, Retrofit
序列化: Jackson, Gson, Protobuf, Avro
CI/CD工具: Jenkins, GitLab CI, GitHub Actions, Docker, Kubernetes
大数据处理: Hadoop, Spark, Flink, Cassandra, Elasticsearch
版本控制: Git, SVN
工具库: Apache Commons, Guava, Lombok, MapStruct, JSch, POI
AI:Spring AI, Google A2A, MCP(模型上下文协议), RAG(检索增强生成), Agent(智能代理), 聊天会话内存, 工具执行框架, 提示填充, 向量化, 语义检索, 向量数据库(Milvus/Chroma/Redis), Embedding模型(OpenAI/Ollama), 客户端-服务器架构, 工具调用标准化, 扩展能力, Agentic RAG, 文档加载, 企业文档问答, 复杂工作流, 智能客服系统, AI幻觉(Hallucination), 自然语言语义搜索
其他: JUnit Pioneer, Dubbo, R2DBC, WebSocket
3. 提问的场景方案可包括但不限于;音视频场景,内容社区与UGC,AIGC,游戏与虚拟互动,电商场景,本地生活服务,共享经济,支付与金融服务,互联网医疗,健康管理,医疗供应链,企业协同与SaaS,产业互联网,大数据与AI服务,在线教育,求职招聘,智慧物流,供应链金融,智慧城市,公共服务数字化,物联网应用,Web3.0与区块链,安全与风控,广告与营销,能源与环保。
4. 按照故事场景,以严肃的面试官和搞笑的水货程序员谢飞机进行提问,谢飞机对简单问题可以回答出来,回答好了面试官还会夸赞和引导。复杂问题含糊其辞,回答的不清晰。
5. 每次进行3轮提问,每轮可以有3-5个问题。这些问题要有技术业务场景上的衔接性,循序渐进引导提问。最后是面试官让程序员回家等通知类似的话术。
6. 提问后把问题的答案详细的,写到文章最后,讲述出业务场景和技术点,让小白可以学习下来。
根据以上内容,不要阐述其他信息,请直接提供;文章标题(需要含带技术点)、文章内容、文章标签(多个用英文逗号隔开)、文章简述(300字)
将以上内容发布文章到CSDN
之后进行,微信公众号消息通知,平台:CSDN、主题:为文章标题、描述:为文章简述、跳转地址:为发布文章到CSDN获取 http url 文章地址
""");
executeCommandEntity.setSessionId("flow-session-id-" + System.currentTimeMillis());
executeCommandEntity.setMaxStep(4);
// 创建动态上下文
DefaultFlowAgentExecuteStrategyFactory.DynamicContext dynamicContext = new DefaultFlowAgentExecuteStrategyFactory.DynamicContext();
dynamicContext.setMaxStep(executeCommandEntity.getMaxStep());
dynamicContext.setExecutionHistory(new StringBuilder());
dynamicContext.setCurrentTask(executeCommandEntity.getMessage());
String apply = executeHandler.apply(executeCommandEntity, dynamicContext);
log.info("Flow执行结果:{}", apply);
}
}在执行前,需要先获取 OpenAI 对接 Key,并将其存入数据库中,以便后续任务能够正常调用接口。
本步骤主要负责 流程的实际执行与验证,用于测试整体链路是否运行正常。
需要注意的是,如果在执行过程中频繁向 CSDN 发帖,可能会触发平台的风控机制而被拒绝,请合理控制调用频率。
2. 验证结果

最终测试后,你可以看到执行的代码,发帖的内容,内容的通知:https://blog.csdn.net/qq_34968019/article/details/153203581?spm=1001.2014.3001.5501