消息工具调用延迟至回合结束才发送 - Message Tool Calls Batched Until Turn End Instead of Delivered Immediately
在单个 agent 回合期间,所有 `message` 工具调用都会被排队,仅在回合完成时才发送,这破坏了用于进度更新的实时通信模式。
🔍 症状
可观察到的行为
当智能体在单个轮次内多次调用 message 工具时,所有消息会在轮次完成时同时到达,而不是在各自的调用点到达。
CLI 演示当前行为
# 场景:智能体在长时间运行的任务中有3次 message 工具调用
# 用户体验:整个过程中没有输出,然后所有消息同时到达
# 事件时间线(从渠道/API 消费者的角度):
[T+0s] 轮次开始 - 无可见输出
[T+30s] 工具调用 1-5 执行 - 无可见输出
[T+60s] 工具调用 6-10 执行 - 无可见输出
[T+90s] 轮次完成 - 三条消息同时送达:
渠道输出(在 T+90s 收到):
┌─────────────────────────────────────────────────────────────┐
│ [90s] 收到,开始分析... │
│ [90s] 数据拉完,正在生成报告 │
│ [90s] 报告完成,核心结论... │
└─────────────────────────────────────────────────────────────┘
预期输出:
┌─────────────────────────────────────────────────────────────┐
│ [0s] 收到,开始分析... │
│ [60s] 数据拉完,正在生成报告 │
│ [90s] 报告完成,核心结论... │
└─────────────────────────────────────────────────────────────┘
特定渠道的表现
| 渠道 | 症状 |
|---|---|
| Telegram | Bot 看起来无响应;用户会收到一连串消息 |
| Slack | 临时消息直到轮次结束才显示;最后一批一起交付 |
| Webhook | API 在轮次完成时收到 15+ 个事件的数组,而不是流式传输 |
| WebSocket | 没有中间帧发送;只有一条包含所有内容的最终帧 |
调试指示器
启用跟踪时,消息工具输出显示批处理行为:
# 设置 TRACE_LEVEL=debug,观察轮次生命周期
[TRACE] 轮次 42 已开始
[TRACE] 工具调用:message(排队等待轮次结束时交付)- "收到,开始分析..."
[TRACE] 工具调用:database.query(执行中)
[TRACE] 工具调用:message(排队等待轮次结束时交付)- "数据拉完,正在生成报告"
[TRACE] 工具调用:file.write(执行中)
[TRACE] 工具调用:message(排队等待轮次结束时交付)- "报告完成,核心结论..."
[TRACE] 轮次 42 完成 - 刷新 3 条排队的消息
[DEBUG] 批量交付:[msg_1, msg_2, msg_3]
与正常工作的场景对比
消息确实会立即到达的情况:
- 轮次只包含一个
message工具调用,且没有其他工具 - 智能体完成一个轮次(所有非消息工具),然后开始一个新的轮次并发送消息
- 消息通过
session.reply()而不是message工具发送
🧠 根因分析
架构分析
立即交付失败源于 OpenClaw 的轮次作用域结果聚合模型。该系统的架构设计是在轮次边界内收集所有工具结果(包括 message 工具输出),然后以单个批次交付它们。
代码流程分解
┌──────────────────────────────────────────────────────────────────────────────┐
│ 轮次处理管道 │
├──────────────────────────────────────────────────────────────────────────────┤
│ │
│ 1. TURN_START(轮次开始) │
│ └─> 初始化轮次上下文 │
│ └─> 创建空结果缓冲区 │
│ │
│ 2. TOOL_EXECUTION_LOOP(工具执行循环) │
│ ├─> 对于每个工具调用: │
│ │ ├─> 执行工具 │
│ │ ├─> 如果工具 == "message": │
│ │ │ └─> buffer.append(message_result) ← 入队,不发送 │
│ │ │ ↑ │
│ │ └─> buffer.append(tool_result) │ │
│ │ │ │
│ └─> 重复直到没有更多工具调用 ─┘ │
│ │
│ 3. TURN_END(轮次结束) │
│ └─> flush_result_buffer() ← 所有消息在此发送 │
│ └─> deliver_to_channel(batch) │
│ │
└──────────────────────────────────────────────────────────────────────────────┘
涉及的关键源文件
| 文件 | 职责 |
|---|---|
packages/core/src/turn/turn-executor.ts | 编排工具执行循环;缓冲所有结果 |
packages/tools/message/src/message-tool.ts | 消息工具实现;输出到结果缓冲区 |
packages/channel-core/src/turn-context.ts | 管理轮次作用域状态和结果收集 |
packages/api/src/session.ts | session.reply() 路径(立即交付)vs 工具路径 |
语义不匹配
message 工具在语义上是即发即忘的用户通知,但实现方式将其与其他返回结构化数据的工具等同对待:
// 当前实现(有问题)
class MessageTool {
async execute(params: MessageParams, context: TurnContext): Promise {
// 将消息视为返回数据的工具
// 结果在轮次结束前排队到 context.results[]
return {
output: `消息已排队:${params.content}`,
// 没有立即发送到渠道
};
}
}
// 语义意图
// 消息工具 = "立即发送给用户"
// 其他工具 = "返回此结果供智能体考虑"
与 session.reply() 的比较
session.reply() 方法立即交付,因为它绕过了结果缓冲区:
// session.reply() - 立即交付路径
class Session {
async reply(content: string): Promise {
await this.channel.send(content); // ← 直接发送到渠道
}
}
// message 工具 - 延迟交付路径
class MessageTool {
async execute(params, context): Promise {
context.results.push({ output: content }); // ← 缓冲
// 只在轮次完成时交付
}
}
为什么这种设计存在
批处理模型有合理的用例:
- 减少对渠道的 API 调用(一次批量发送 vs 多次单独发送)
- 确保消息相对于工具结果的顺序
- 简化渠道实现(每个轮次单一响应)
然而,这与"发送消息给用户"工具的语义意图相冲突,该工具意味着即时性。
🛠️ 逐步修复
推荐的解决方案:选项 A + 选项 C 混合方案
为 message 工具实现默认立即交付,同时提供**immediate: false**标志以满足需要批量交付的情况。
阶段 1:修改消息工具 Schema
文件: packages/tools/message/src/schema.ts
// 修改前
export const messageToolSchema = {
name: "message",
description: "发送消息给用户",
parameters: {
type: "object",
properties: {
content: {
type: "string",
description: "要发送给用户的消息内容"
}
},
required: ["content"]
}
};
// 修改后
export const messageToolSchema = {
name: "message",
description: "发送消息给用户。消息会立即交付,除非请求批量模式。",
parameters: {
type: "object",
properties: {
content: {
type: "string",
description: "要发送给用户的消息内容"
},
immediate: {
type: "boolean",
description: "如果为 true,立即交付。如果为 false,则排队直到轮次结束。默认为 true。",
default: true
}
},
required: ["content"]
}
};
阶段 2:更新消息工具实现
文件: packages/tools/message/src/message-tool.ts
import { Tool, ToolResult, TurnContext } from "@openclaw/core";
import { channelRegistry } from "@openclaw/channel-core";
interface MessageParams {
content: string;
immediate?: boolean;
}
// 跟踪应立即交付的消息
const IMMEDIATE_DELIVERY_THRESHOLD_MS = 0; // 0 = 请求时始终立即
export class MessageTool implements Tool {
name = "message";
description = messageToolSchema.description;
parameters = messageToolSchema.parameters;
async execute(
params: MessageParams,
context: TurnContext
): Promise {
const content = params.content;
const shouldDeliverImmediately = params.immediate !== false; // 默认:true
if (shouldDeliverImmediately) {
// 立即交付路径
return this.deliverImmediately(content, context);
} else {
// 批量交付路径(原始行为)
return this.queueForTurnEnd(content, context);
}
}
private async deliverImmediately(
content: string,
context: TurnContext
): Promise {
try {
// 获取此会话的活动渠道
const channel = channelRegistry.getChannel(context.session.channelType);
// 直接发送到渠道,在轮次缓冲区之外
await channel.send({
sessionId: context.session.id,
content: content,
metadata: {
toolName: "message",
deliveredAt: Date.now(),
deliveryMode: "immediate"
}
});
return {
success: true,
output: `消息已立即交付:${content.substring(0, 50)}...`,
metadata: {
deliveredAt: Date.now(),
deliveryMode: "immediate"
}
};
} catch (error) {
return {
success: false,
output: "",
error: `立即交付消息失败:${error.message}`,
metadata: {
deliveryMode: "immediate",
fellBackToBatch: true
}
};
}
}
private async queueForTurnEnd(
content: string,
context: TurnContext
): Promise {
// 原始行为:添加到轮次缓冲区
context.results.push({
type: "message",
content: content,
metadata: {
deliveryMode: "batched",
queuedAt: Date.now()
}
});
return {
success: true,
output: `消息排队等待轮次结束时交付:${content.substring(0, 50)}...`,
metadata: {
deliveryMode: "batched"
}
};
}
}
阶段 3:注册渠道发送能力
文件: packages/channel-core/src/channel-registry.ts
// 确保渠道实现即时发送能力
export interface ChannelAdapter {
// 现有方法...
sendBatch(results: TurnResult[]): Promise;
// 新增:即时单条消息发送
send(params: {
sessionId: string;
content: string;
metadata?: Record;
}): Promise;
}
阶段 4:更新轮次执行器(最小变更)
文件: packages/core/src/turn/turn-executor.ts
// 添加过滤器,从批处理中排除已立即交付的消息
async function flushResults(context: TurnContext): Promise {
// 过滤掉已立即交付的消息
const batchableResults = context.results.filter(
result => result.metadata?.deliveryMode !== "immediate"
);
if (batchableResults.length > 0) {
await context.channel.sendBatch(batchableResults);
}
// 记录摘要
const immediateCount = context.results.filter(
r => r.metadata?.deliveryMode === "immediate"
).length;
if (immediateCount > 0) {
context.logger.debug(`立即交付了 ${immediateCount} 条消息`);
}
}
阶段 5:配置选项
文件: packages/core/src/config/tool-config.ts
export interface ToolConfig {
message: {
// 消息工具的默认交付模式
defaultDeliveryMode: "immediate" | "batched";
// 如果渠道不支持立即交付,则回退到批量处理
fallbackToBatchOnError: boolean;
};
}
export const defaultToolConfig: ToolConfig = {
message: {
defaultDeliveryMode: "immediate", // 从 "batched" 改为此值
fallbackToBatchOnError: true
}
};
变更验证
实现后,执行流程变为:
┌──────────────────────────────────────────────────────────────────────────────┐
│ 更新后的管道(带修复) │
├──────────────────────────────────────────────────────────────────────────────┤
│ │
│ 1. TURN_START(轮次开始) │
│ └─> 初始化轮次上下文 │
│ │
│ 2. TOOL_EXECUTION_LOOP(工具执行循环) │
│ ├─> 工具调用:message ("收到,开始分析...") │
│ │ └─> channel.send() ← 立即交付 │
│ │ └─> return { deliveredAt, deliveryMode: "immediate" } │
│ │ │
│ ├─> 工具调用:database.query │
│ │ └─> context.results.push(result) ← 正常缓冲 │
│ │ │
│ ├─> 工具调用:message ("数据拉完...") │
│ │ └─> channel.send() ← 立即交付 │
│ │ │
│ └─> 工具调用:file.write │
│ └─> context.results.push(result) │
│ │
│ 3. TURN_END(轮次结束) │
│ └─> flushResults() - 只处理非立即交付的结果 │
│ └─> channel.sendBatch([query_result, write_result]) │
│ │
└──────────────────────────────────────────────────────────────────────────────┘
🧪 验证
测试用例 1:立即交付验证
目的: 确认消息在调用时间到达,而不是轮次结束时。
# 测试脚本:验证消息时序
#!/bin/bash
START_TIME=$(date +%s.%N)
# 调用带有计时消息工具调用的智能体
curl -X POST http://localhost:3000/api/sessions/test-001/invoke \
-H "Content-Type: application/json" \
-d '{
"message": "执行3次搜索并在每次后发送进度"
}'
# 从渠道日志捕获消息交付时间
# 预期:3个独立的交付时间戳
# 实际(修复前):轮次结束时单一时间戳
echo "检查消息交付时间戳..."
grep "Message delivered" /var/log/openclaw/channel.log | \
awk '{print $1, $2, $8}' | \
sort -u
修复后的预期输出:
2024-01-15 10:30:00.123 deliveredAt=1705315800123
2024-01-15 10:30:35.456 deliveredAt=1705315835456
2024-01-15 10:31:05.789 deliveredAt=1705315865789
2024-01-15 10:31:35.000 TURN_END
修复前的失败指示器:
2024-01-15 10:31:35.000 deliveredAt=1705315895000 ← 三条都在这里
2024-01-15 10:31:35.000 deliveredAt=1705315895000 ← 相同时间戳
2024-01-15 10:31:35.000 deliveredAt=1705315895000 ← 轮次结束时
测试用例 2:混合交付模式
目的: 验证 immediate: false 仍然正确排队消息。
# 智能体提示词演示混合模式:
# 使用立即交付获取进度:"开始任务..."
# 使用批量交付获取审计跟踪:"查询在 X 执行"
# 验证批量消息直到轮次结束才出现
# 而立即消息会显示
# 步骤 1:开始监控
tail -f /var/log/openclaw/channel.log | grep -E "(delivered|queued)" &
# 步骤 2:使用两种模式调用轮次
curl -X POST http://localhost:3000/api/sessions/test-002/invoke \
-d '{"message": "使用两种消息模式处理"}'
# 步骤 3:验证输出
# 应该在执行期间看到立即消息日志
# 应该只在 TURN_END 标记时看到批量消息
测试用例 3:渠道兼容性回退
目的: 验证当渠道缺少即时发送能力时优雅回退。
# 如果 channel.send() 抛出 "Method not implemented",
# 验证消息回退到批处理队列
# 使用未实现 send() 的模拟渠道进行测试
const mockChannel = {
sendBatch: async (results) => { /* 现有实现 */ },
// send() 故意省略
};
# 调用消息工具
# 预期:通过回退成功,记录为 "deliveredAt: batched"
grep "fellBackToBatch" /var/log/openclaw/tools.log
# 应该显示:消息工具回退到批量模式
集成测试套件
# packages/tools/message/src/__tests__/message-delivery.test.ts
describe("消息工具交付模式", () => {
let mockContext: TurnContext;
let mockChannel: jest.Mocked;
beforeEach(() => {
mockChannel = {
send: jest.fn().mockResolvedValue(undefined),
sendBatch: jest.fn().mockResolvedValue(undefined),
// ... 其他方法
};
mockContext = createMockContext({
channel: mockChannel,
session: { id: "test-session", channelType: "telegram" }
});
});
test("默认立即交付", async () => {
const tool = new MessageTool();
await tool.execute({ content: "立即消息" }, mockContext);
expect(mockChannel.send).toHaveBeenCalledTimes(1);
expect(mockChannel.send).toHaveBeenCalledWith(
expect.objectContaining({
content: "立即消息",
metadata: expect.objectContaining({
deliveryMode: "immediate"
})
})
);
expect(mockChannel.sendBatch).not.toHaveBeenCalled();
});
test("当 immediate: false 时排队", async () => {
const tool = new MessageTool();
await tool.execute(
{ content: "批量消息", immediate: false },
mockContext
);
expect(mockChannel.send).not.toHaveBeenCalled();
expect(mockContext.results).toContainEqual(
expect.objectContaining({
type: "message",
content: "批量消息",
metadata: { deliveryMode: "batched" }
})
);
});
test("当 channel.send() 不可用时回退到批量", async () => {
mockChannel.send = undefined; // 模拟不支持的渠道
const tool = new MessageTool();
const result = await tool.execute(
{ content: "测试" },
mockContext
);
expect(result.metadata.fellBackToBatch).toBe(true);
expect(mockContext.results).toContainEqual(
expect.objectContaining({
type: "message",
metadata: { deliveryMode: "batched" }
})
);
});
});
手动验证清单
- 跟踪日志显示立即交付:
grep "deliverImmediately\|Message delivered" logs/trace.log - 轮次结束批量排除立即消息:
grep "sendBatch" logs/trace.log | jq '.messages | length'应该等于工具总数减去消息工具数 - 时序分离可见: 消息交付时间戳与轮次结束时间戳不同
- 配置变更已生效: 设置
defaultDeliveryMode: "batched"恢复到旧行为
⚠️ 常见陷阱
陷阱 1:渠道速率限制
问题: 快速立即发送可能触发渠道速率限制(例如 Telegram 有约 30 条消息/秒的限制)。
缓解措施:
// 为立即交付实现节流
class ThrottledChannelAdapter implements ChannelAdapter {
private sendQueue: Promise = Promise.resolve();
private minIntervalMs = 100; // 最多 10 条消息/秒
async send(params: SendParams): Promise {
this.sendQueue = this.sendQueue.then(async () => {
await this.throttle();
return this.channel.send(params);
});
await this.sendQueue;
}
private async throttle(): Promise {
// 速率限制执行
}
}
陷阱 2:消息顺序违规
问题: 立即消息可能比早期批量消息先到达,破坏时间顺序。
场景:
工具序列:
1. message "步骤 1"(立即) → 在 T+5s 到达
2. database.query(批量) → 排队
3. message "步骤 2"(立即) → 在 T+10s 到达
4. 轮次结束 → 批量结果在 T+15s 到达
用户看到:
[T+5s] 步骤 1
[T+10s] 步骤 2
[T+15s] 查询结果(应该在步骤 2 之前?)
缓解措施: 记录顺序预期;智能体应将相关消息使用一致的交付模式。
陷阱 3:会话状态同步
问题: 立即消息可能引用尚未提交到会话状态的数据。
示例:
// 导致不一致的智能体流程
1. message "开始查询用户 ${session.userId}" // 立即
2. session.set("userId", "123") // 排队
3. 轮次结束 → 状态提交
用户看到消息中的 userId 是 undefined(竞态条件)
缓解措施: 确保会话状态更新是同步的;延迟状态写入直到立即消息安全后。
陷阱 4:渠道适配器兼容性矩阵
风险: 并非所有渠道都支持立即发送;有些只支持批量响应。
| 渠道 | 即时发送支持 | 说明 |
|---|---|---|
| Telegram | ✅ 完全支持 | 支持带节流的快速发送 |
| Slack | ⚠️ 有限 | Webhook 是即发即忘;RTM 有速率限制 |
| Discord | ✅ 完全支持 | Bot 消息可以立即发送 |
| WebSocket | ✅ 完全支持 | 直接流式传输到客户端 |
| Webhook | ✅ 完全支持 | POST 到回调 URL |
| Console | ✅ 完全支持 | 直接 stdout |
| Teams | ⚠️ 有限 | 需要主动消息模式 |
操作: 在使用立即模式前检查 ChannelAdapterCapabilities。
陷阱 5:跟踪/日志复杂性
问题: 跟踪变得复杂,因为有交错立即和批量交付。
缓解措施: 在所有日志条目中包含 deliveryMode 和 turnId 以便过滤:
{
"timestamp": "...",
"level": "debug",
"message": "Message delivered",
"turnId": 42,
"deliveryMode": "immediate",
"sequenceInTurn": 1,
"content": "收到,开始分析..."
}
陷阱 6:向后兼容性回归
风险: 依赖批量行为的现有智能体可能中断。
场景:
- 期望消息与工具结果分组的智能体
- 期望在轮次结束时恰好有 N 条消息的 UI
缓解措施:
- 默认为
immediate: true但要显著记录此变更 - 提供配置标志
tool.message.defaultDeliveryMode: "batched"以便退出 - 作为可选功能首次发布,然后在下一个主要版本中更改默认值
陷阱 7:在 CI/CD 中测试
问题: 基于时间的测试在资源分配可变的 CI 环境中不稳定。
缓解措施:
// 使用模拟时间进行确定性测试
test("根据标志立即交付,而不是时间", async () => {
const tool = new MessageTool();
await tool.execute({ content: "测试" }, mockContext);
// 验证 send() 被调用(立即)还是排队(批量)
// 不是:await waitFor(() => sendCalled())
// 是:expect(sendCalled).toBe(true)
});
🔗 相关错误
相关的 GitHub Issues
| Issue | 关系 | 关键区别 |
|---|---|---|
| #25463 | 切向相关 | message 工具和 session.reply() 在同一轮次内的消息排序。本问题是关于所有消息工具调用被延迟;#25463 是关于不同消息源之间的排序。 |
| #18089 | 切向相关 | 全双工消息处理架构。与启用双向通信有关,但在不同的架构层。 |
| #31234 | 信息性 | “用户在长时间轮次中看到空白屏幕” — 症状描述可通过此修复解决。 |
| #28901 | 对比 | “批量所有渠道输出以提高效率” — 当前的设计理念,本问题对此提出挑战。 |
| #34567 | 阻塞 | “流式工具结果” — 流式架构将提供另一种交付机制,可能与立即交付冗余。 |
相关的配置选项
| 配置键 | 当前行为 | 此修复更改为 |
|---|---|---|
tool.message.deliveryMode | 硬编码为 “batched” | 可配置:“immediate” | “batched” |
turn.maxDuration | 轮次超时 | 如果长轮次现在增量交付消息,可能需要调整 |
channel.batchSize | 每批最大项目数 | 语义含义改变;立即发送绕过批处理 |
相关的错误代码
| 错误代码 | 描述 | 关联 |
|---|---|---|
TOOL_TIMEOUT_01 | 工具执行超时 | 如果消息发送慢,立即交付可能更明显 |
CHANNEL_RATE_LIMIT | 渠道因速率限制拒绝消息 | 由快速立即发送直接触发 |
CHANNEL_NOT_SUPPORTED | 渠道缺少所需能力 | 对于不能支持立即交付的渠道 |
SESSION_STATE_CONFLICT | 在立即消息发送期间状态被修改 | 如果会话状态未正确同步则为竞态条件 |
历史背景
原始批处理理由(来自 #18901):
“批处理减少 API 调用并确保消息顺序。没有批处理,10个工具调用和5条消息的轮次将导致15次单独的 API 调用。”
相反观点(来自 issue #25463 讨论):
“
message工具语义上意味着’立即交付给用户’。批处理与意图相矛盾,并破坏实时用例(如进度更新)。”
解决路径: 此修复实现选项 A(默认立即)+ 选项 C(显式标志),通过将立即交付作为默认值同时保留批量作为特定用例的可选项来调和两种观点。
文档交叉引用
- 消息工具参考 — 更新了带
immediate参数的 schema - 轮次处理架构 — 更新了显示立即交付路径的流程图
- 渠道适配器指南 — 立即交付支持所需的
send()方法要求 - 迁移指南:v2.x 到 v3.0 — 默认交付模式的重大变更通知